This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 85c4ef9b50 Revert "Fix: DAGs are not marked as stale if the dags
folder change" (41433) (#41829) (#41893) 41829 (#42220)
85c4ef9b50 is described below
commit 85c4ef9b505a2f7c94d2569f273a188f7f0c8c74
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Sep 13 10:20:24 2024 -0700
Revert "Fix: DAGs are not marked as stale if the dags folder change"
(41433) (#41829) (#41893) 41829 (#42220)
* Revert "Skip test_scan_stale_dags_when_dag_folder_change in DB isolation
mode (#41893)"
This reverts commit 07af14ae75820a98b60ecffa2949ef7ad70bacab.
* Revert "Fix: DAGs are not marked as stale if the dags folder change
(#41433) (#41829)"
This reverts commit 996af78376e48dca34d469a9bcca569647ae17de.
---
airflow/dag_processing/manager.py | 10 +----
tests/dag_processing/test_job_runner.py | 72 ++-------------------------------
tests/jobs/test_scheduler_job.py | 1 -
3 files changed, 6 insertions(+), 77 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 819da5d7e1..c03bc074d0 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -526,20 +526,14 @@ class DagFileProcessorManager(LoggingMixin):
dags_parsed = session.execute(query)
for dag in dags_parsed:
- # When the DAG processor runs as part of the scheduler, and the
user changes the DAGs folder,
- # DAGs from the previous DAGs folder will be marked as stale. Note
that this change has no impact
- # on standalone DAG processors.
- dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc,
dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's
last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that
indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold
configured to prevent a
# significant delay in deactivation of stale dags when a large
timeout is configured
- dag_removed_from_dag_folder_or_file = (
+ if (
dag.fileloc in last_parsed
and (dag.last_parsed_time +
timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
- )
-
- if dag_not_in_current_dag_folder or
dag_removed_from_dag_folder_or_file:
+ ):
cls.logger().info("DAG %s is missing and will be
deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index 9b8437d77d..8112b7222a 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -26,7 +26,6 @@ import pathlib
import random
import socket
import sys
-import tempfile
import textwrap
import threading
import time
@@ -639,7 +638,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=str(TEST_DAG_FOLDER),
+ dag_directory="directory",
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
@@ -713,11 +712,11 @@ class TestDagProcessorJobRunner:
"""
Ensure only dags from current dag_directory are updated
"""
- dag_directory = str(TEST_DAG_FOLDER)
+ dag_directory = "directory"
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=TEST_DAG_FOLDER,
+ dag_directory=dag_directory,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
@@ -741,7 +740,7 @@ class TestDagProcessorJobRunner:
# Add stale DAG to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.last_parsed_time = timezone.utcnow()
- other_dag.sync_to_db(processor_subdir="/other")
+ other_dag.sync_to_db(processor_subdir="other")
# Add DAG to the file_parsing_stats
stat = DagFileStat(
@@ -763,69 +762,6 @@ class TestDagProcessorJobRunner:
active_dag_count =
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1
- @pytest.mark.skip_if_database_isolation_mode # Test is broken in db
isolation mode
- def test_scan_stale_dags_when_dag_folder_change(self):
- """
- Ensure dags from old dag_folder is marked as stale when dag processor
- is running as part of scheduler.
- """
-
- def get_dag_string(filename) -> str:
- return open(TEST_DAG_FOLDER / filename).read()
-
- with tempfile.TemporaryDirectory() as tmpdir:
- old_dag_home = tempfile.mkdtemp(dir=tmpdir)
- old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home,
suffix=".py")
-
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
- old_dag_file.flush()
- new_dag_home = tempfile.mkdtemp(dir=tmpdir)
- new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home,
suffix=".py")
-
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
- new_dag_file.flush()
-
- manager = DagProcessorJobRunner(
- job=Job(),
- processor=DagFileProcessorManager(
- dag_directory=new_dag_home,
- max_runs=1,
- processor_timeout=timedelta(minutes=10),
- signal_conn=MagicMock(),
- dag_ids=[],
- pickle_dags=False,
- async_mode=True,
- ),
- )
-
- dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
- other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
-
- with create_session() as session:
- # Add DAG from old dah home to the DB
- dag = dagbag.get_dag("test_example_bash_operator")
- dag.fileloc = old_dag_file.name
- dag.last_parsed_time = timezone.utcnow()
- dag.sync_to_db(processor_subdir=old_dag_home)
-
- # Add DAG from new DAG home to the DB
- other_dag = other_dagbag.get_dag("test_start_date_scheduling")
- other_dag.fileloc = new_dag_file.name
- other_dag.last_parsed_time = timezone.utcnow()
- other_dag.sync_to_db(processor_subdir=new_dag_home)
-
- manager.processor._file_paths = [new_dag_file]
-
- active_dag_count = (
-
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
- )
- assert active_dag_count == 2
-
- manager.processor._scan_stale_dags()
-
- active_dag_count = (
-
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
- )
- assert active_dag_count == 1
-
@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle",
new_callable=PropertyMock
)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 38bde8bf26..2e96728d5e 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3568,7 +3568,6 @@ class TestSchedulerJob:
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
- fileloc=os.devnull + "/test_retry_still_in_executor.py",
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",