ephraimbuddy commented on code in PR #41433:
URL: https://github.com/apache/airflow/pull/41433#discussion_r1731279219


##########
tests/dag_processing/test_job_runner.py:
##########
@@ -762,6 +763,95 @@ def test_scan_stale_dags_standalone_mode(self):
             active_dag_count = 
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
             assert active_dag_count == 1
 
+    @conf_vars(
+        {
+            ("scheduler", "standalone_dag_processor"): "False",

Review Comment:
   This is False by default? Should we check too that when enabled, it doesn't 
affect standalone processor



##########
tests/dag_processing/test_job_runner.py:
##########
@@ -762,6 +763,95 @@ def test_scan_stale_dags_standalone_mode(self):
             active_dag_count = 
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
             assert active_dag_count == 1
 
+    @conf_vars(
+        {
+            ("scheduler", "standalone_dag_processor"): "False",
+        }
+    )
+    def test_scan_stale_dags_when_dag_folder_change(self):
+        """
+        Ensure dags from old dag_folder is marked as stale when dag processor
+         is running as part of scheduler.
+        """
+
+        def get_dag_string(dag_id) -> str:
+            return (
+                f"from __future__ import annotations\n"
+                f"import datetime\n"
+                f"from airflow.models.dag import DAG\n"
+                f"from airflow.operators.empty import EmptyOperator\n"
+                f"with DAG(\n"
+                f'  dag_id="{dag_id}",\n'
+                f"  schedule=datetime.timedelta(hours=4),\n"
+                f"  start_date=datetime.datetime(2021, 1, 1),\n"
+                f"  catchup=False,\n"
+                f") as dag:\n"
+                f'    task1 = EmptyOperator(task_id="task1")\n'

Review Comment:
   Can you use a DAG from tests/dags directory instead of this?



##########
tests/dag_processing/test_job_runner.py:
##########
@@ -762,6 +763,95 @@ def test_scan_stale_dags_standalone_mode(self):
             active_dag_count = 
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
             assert active_dag_count == 1
 
+    @conf_vars(
+        {
+            ("scheduler", "standalone_dag_processor"): "False",
+        }
+    )
+    def test_scan_stale_dags_when_dag_folder_change(self):
+        """
+        Ensure dags from old dag_folder is marked as stale when dag processor
+         is running as part of scheduler.
+        """
+
+        def get_dag_string(dag_id) -> str:
+            return (
+                f"from __future__ import annotations\n"
+                f"import datetime\n"
+                f"from airflow.models.dag import DAG\n"
+                f"from airflow.operators.empty import EmptyOperator\n"
+                f"with DAG(\n"
+                f'  dag_id="{dag_id}",\n'
+                f"  schedule=datetime.timedelta(hours=4),\n"
+                f"  start_date=datetime.datetime(2021, 1, 1),\n"
+                f"  catchup=False,\n"
+                f") as dag:\n"
+                f'    task1 = EmptyOperator(task_id="task1")\n'
+            )
+
+        with tempfile.TemporaryDirectory() as tmpdir:
+            old_dag_home = tempfile.mkdtemp(dir=tmpdir)
+            old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, 
suffix=".py")
+            old_dag_file.write(get_dag_string("old_temp_dag").encode())
+            old_dag_file.flush()
+            new_dag_home = tempfile.mkdtemp(dir=tmpdir)
+            new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, 
suffix=".py")
+            new_dag_file.write(get_dag_string("new_temp_dag").encode())
+            new_dag_file.flush()
+
+            manager = DagProcessorJobRunner(
+                job=Job(),
+                processor=DagFileProcessorManager(
+                    dag_directory=new_dag_home,
+                    max_runs=1,
+                    processor_timeout=timedelta(minutes=10),
+                    signal_conn=MagicMock(),
+                    dag_ids=[],
+                    pickle_dags=False,
+                    async_mode=True,
+                ),
+            )
+
+            dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
+            other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
+
+            with create_session() as session:
+                # Add DAG from old dah home to the DB
+                dag = dagbag.get_dag("old_temp_dag")
+                dag.fileloc = old_dag_file.name
+                dag.last_parsed_time = timezone.utcnow()
+                dag.sync_to_db(processor_subdir=old_dag_home)
+
+                # Add DAG from new DAG home to the DB
+                other_dag = other_dagbag.get_dag("new_temp_dag")
+                other_dag.fileloc = new_dag_file.name
+                other_dag.last_parsed_time = timezone.utcnow()
+                other_dag.sync_to_db(processor_subdir=new_dag_home)
+
+                # Add DAG to the file_parsing_stats
+                stat = DagFileStat(

Review Comment:
   Is this stat needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to