This is an automated email from the ASF dual-hosted git repository.
utkarsharma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6235cf8975 Revert "Handle Example dags case when checking for missing
files (#41856)" (#42193)
6235cf8975 is described below
commit 6235cf897598b03ace2e52bb677b478a5612ccad
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Thu Sep 12 19:03:24 2024 +0530
Revert "Handle Example dags case when checking for missing files (#41856)"
(#42193)
This reverts commit 435e9687b0c56499bc29c21d3cada8ae9e0a8c53.
---
airflow/dag_processing/manager.py | 11 ++--
tests/dag_processing/test_job_runner.py | 89 +++++++++++++++++----------------
2 files changed, 48 insertions(+), 52 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 7e404307dc..fee515dc07 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -41,7 +41,6 @@ from sqlalchemy import delete, select, update
from tabulate import tabulate
import airflow.models
-from airflow import example_dags
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest,
SlaCallbackRequest
from airflow.configuration import conf
@@ -70,8 +69,6 @@ from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks
-example_dag_folder = next(iter(example_dags.__path__))
-
if TYPE_CHECKING:
from multiprocessing.connection import Connection as
MultiprocessingConnection
@@ -530,11 +527,9 @@ class DagFileProcessorManager(LoggingMixin):
for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the
user changes the DAGs folder,
- # DAGs from the previous DAGs folder will be marked as stale. We
also need to handle example dags
- # differently. Note that this change has no impact on standalone
DAG processors.
- dag_not_in_current_dag_folder = (
- not os.path.commonpath([dag.fileloc, example_dag_folder]) ==
example_dag_folder
- ) and (os.path.commonpath([dag.fileloc, dag_directory]) !=
dag_directory)
+ # DAGs from the previous DAGs folder will be marked as stale. Note
that this change has no impact
+ # on standalone DAG processors.
+ dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc,
dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's
last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that
indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold
configured to prevent a
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index 0e15a2d1f6..9b8437d77d 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -773,57 +773,58 @@ class TestDagProcessorJobRunner:
def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()
- def add_dag_to_db(file_path, dag_id, processor_subdir):
- dagbag = DagBag(file_path, read_dags_from_db=False)
- dag = dagbag.get_dag(dag_id)
- dag.fileloc = file_path
- dag.last_parsed_time = timezone.utcnow()
- dag.sync_to_db(processor_subdir=processor_subdir)
+ with tempfile.TemporaryDirectory() as tmpdir:
+ old_dag_home = tempfile.mkdtemp(dir=tmpdir)
+ old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home,
suffix=".py")
+
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
+ old_dag_file.flush()
+ new_dag_home = tempfile.mkdtemp(dir=tmpdir)
+ new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home,
suffix=".py")
+
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
+ new_dag_file.flush()
+
+ manager = DagProcessorJobRunner(
+ job=Job(),
+ processor=DagFileProcessorManager(
+ dag_directory=new_dag_home,
+ max_runs=1,
+ processor_timeout=timedelta(minutes=10),
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ ),
+ )
- def create_dag_folder(dag_id):
- dag_home = tempfile.mkdtemp(dir=tmpdir)
- dag_file = tempfile.NamedTemporaryFile(dir=dag_home, suffix=".py")
- dag_file.write(get_dag_string(dag_id).encode())
- dag_file.flush()
- return dag_home, dag_file
+ dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
+ other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
- with tempfile.TemporaryDirectory() as tmpdir:
- old_dag_home, old_dag_file =
create_dag_folder("test_example_bash_operator.py")
- new_dag_home, new_dag_file =
create_dag_folder("test_scheduler_dags.py")
- example_dag_home, example_dag_file =
create_dag_folder("test_dag_warnings.py")
-
- with
mock.patch("airflow.dag_processing.manager.example_dag_folder",
example_dag_home):
- manager = DagProcessorJobRunner(
- job=Job(),
- processor=DagFileProcessorManager(
- dag_directory=new_dag_home,
- max_runs=1,
- processor_timeout=timedelta(minutes=10),
- signal_conn=MagicMock(),
- dag_ids=[],
- pickle_dags=False,
- async_mode=True,
- ),
- )
+ with create_session() as session:
+ # Add DAG from old dah home to the DB
+ dag = dagbag.get_dag("test_example_bash_operator")
+ dag.fileloc = old_dag_file.name
+ dag.last_parsed_time = timezone.utcnow()
+ dag.sync_to_db(processor_subdir=old_dag_home)
- with create_session() as session:
- add_dag_to_db(old_dag_file.name,
"test_example_bash_operator", old_dag_home)
- add_dag_to_db(new_dag_file.name,
"test_start_date_scheduling", new_dag_home)
- add_dag_to_db(example_dag_file.name, "test_dag_warnings",
example_dag_home)
+ # Add DAG from new DAG home to the DB
+ other_dag = other_dagbag.get_dag("test_start_date_scheduling")
+ other_dag.fileloc = new_dag_file.name
+ other_dag.last_parsed_time = timezone.utcnow()
+ other_dag.sync_to_db(processor_subdir=new_dag_home)
- manager.processor._file_paths = [new_dag_file,
example_dag_file]
+ manager.processor._file_paths = [new_dag_file]
- active_dag_count = (
-
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
- )
- assert active_dag_count == 3
+ active_dag_count = (
+
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+ )
+ assert active_dag_count == 2
- manager.processor._scan_stale_dags()
+ manager.processor._scan_stale_dags()
- active_dag_count = (
-
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
- )
- assert active_dag_count == 2
+ active_dag_count = (
+
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+ )
+ assert active_dag_count == 1
@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle",
new_callable=PropertyMock