This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d52cb341444 Refactor dag processor for code clarity (#46443)
d52cb341444 is described below
commit d52cb341444f0b62752a2d5404f375dbc73f2792
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Feb 7 11:18:29 2025 -0800
Refactor dag processor for code clarity (#46443)
Dag processor has experienced a lot of organic growth over the years and
had gotten into a state where it became a bit hard to understand. E.g. method
set_files did not have a name that clearly indicated its purpose. And that's
probably in part because it did a bunch of things and was called in a bunch of
places. I simplify that method and focus on just one aspect of its prior work,
namely handling removed files, and call it handle_removed_files.
A lot of the behavior was driven by a data structure on the instance called
file paths. This contained all known files. It's one thing that was modified in
set_files. From looking at the main loop it was not obvious where this
structure was being used or modified. So I made it a local variable instead of
instance attr. And now we can easily see all the methods that are using it
because it must be passed around.
I rename the file_paths to known_files because that is clearer about its
meaning. Previously it was file_paths and file_queue -- harder to understand
the different purposes. I make file_paths a dictionary because then it's easier
to replace all the files in the bundle, something that was previously done by
iterating through the files.
In prepare file paths, I pull out the mtime mode logic into its own method
because it's quite involved and made the prepare file paths method too big and
complicated. Along with this I simplify the logic to not exclude recently
processed files if they were recently changed.
In some of the tests, I had to change the way we simulated mtime numbers
because the input is now a set which does not guarantee order. So I encode the
mtime in the filename in the test. And the one test dealing with zip file, this
was apparently a flakey test. I change it so we don't mock anything but just
copy the file to a tmp dir and make a bundle there, then remove it and see what
happens.
In clear_orphaned_import_errors I no longer pass the entire list of known
dag files. Cus there could be a lot of them.
---
airflow/dag_processing/manager.py | 309 ++++++++++++++++++-----------------
airflow/models/dag.py | 21 ++-
tests/dag_processing/test_manager.py | 218 ++++++++++++++----------
tests/models/test_dag.py | 5 +-
4 files changed, 310 insertions(+), 243 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 3fb5ac3106d..f720cabbd04 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -31,7 +31,7 @@ import sys
import time
import zipfile
from collections import defaultdict, deque
-from collections.abc import Callable, Iterator
+from collections.abc import Callable, Iterable, Iterator
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from importlib import import_module
@@ -41,7 +41,8 @@ from typing import TYPE_CHECKING, Any, NamedTuple, cast
import attrs
import structlog
-from sqlalchemy import delete, select, tuple_, update
+from sqlalchemy import select, update
+from sqlalchemy.orm import load_only
from tabulate import tabulate
from uuid6 import uuid7
@@ -174,14 +175,13 @@ class DagFileProcessorManager(LoggingMixin):
heartbeat: Callable[[], None] = attrs.field(default=lambda: None)
"""An overridable heartbeat called once every time around the loop"""
- _files: list[DagFileInfo] = attrs.field(factory=list, init=False)
_file_queue: deque[DagFileInfo] = attrs.field(factory=deque, init=False)
_file_stats: dict[DagFileInfo, DagFileStat] = attrs.field(
factory=lambda: defaultdict(DagFileStat), init=False
)
_dag_bundles: list[BaseDagBundle] = attrs.field(factory=list, init=False)
- _bundle_versions: dict[str, str] = attrs.field(factory=dict, init=False)
+ _bundle_versions: dict[str, str | None] = attrs.field(factory=dict,
init=False)
_processors: dict[DagFileInfo, DagFileProcessorProcess] =
attrs.field(factory=dict, init=False)
@@ -296,6 +296,8 @@ class DagFileProcessorManager(LoggingMixin):
poll_time = 0.0
+ known_files: dict[str, set[DagFileInfo]] = {}
+
while True:
loop_start_time = time.monotonic()
@@ -303,17 +305,17 @@ class DagFileProcessorManager(LoggingMixin):
self._kill_timed_out_processors()
- self._refresh_dag_bundles()
+ self._refresh_dag_bundles(known_files=known_files)
if not self._file_queue:
# Generate more file paths to process if we processed all the
files already. Note for this to
# clear down, we must have cleared all files found from
scanning the dags dir _and_ have
# cleared all files added as a result of callbacks
- self.prepare_file_queue()
+ self.prepare_file_queue(known_files=known_files)
self.emit_metrics()
else:
# if new files found in dag dir, add them
- self.add_files_to_queue()
+ self.add_files_to_queue(known_files=known_files)
self._refresh_requested_filelocs()
@@ -331,7 +333,7 @@ class DagFileProcessorManager(LoggingMixin):
# Update number of loop iteration.
self._num_run += 1
- self._print_stat()
+ self.print_stats(known_files=known_files)
if self.max_runs_reached():
self.log.info(
@@ -432,7 +434,7 @@ class DagFileProcessorManager(LoggingMixin):
session.delete(request)
return filelocs
- def _refresh_dag_bundles(self):
+ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
"""Refresh DAG bundles, if required."""
now = timezone.utcnow()
@@ -504,22 +506,19 @@ class DagFileProcessorManager(LoggingMixin):
self._bundle_versions[bundle.name] = version_after_refresh
- found_files = [
+ found_files = {
DagFileInfo(rel_path=p, bundle_name=bundle.name,
bundle_path=bundle.path)
for p in self._find_files_in_bundle(bundle)
- ]
+ }
- # Now that we have the files present in the latest bundle,
- # we need to update file_paths to include any new files
- # and remove any files that are no longer in the bundle.
- # We do this by removing all existing files that are in this bundle
- # and then adding all the current files back in.
- new_files = [f for f in self._files if f.bundle_name !=
bundle.name]
- new_files.extend(found_files)
- self.set_files(new_files)
+ known_files[bundle.name] = found_files
+ self.handle_removed_files(known_files=known_files)
- self.deactivate_deleted_dags(active_files=found_files)
- self.clear_nonexistent_import_errors()
+ self.deactivate_deleted_dags(bundle_name=bundle.name,
present=found_files)
+ self.clear_orphaned_import_errors(
+ bundle_name=bundle.name,
+ observed_filelocs={str(x.absolute_path) for x in found_files},
# todo: make relative
+ )
def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
@@ -530,8 +529,8 @@ class DagFileProcessorManager(LoggingMixin):
return rel_paths
- def deactivate_deleted_dags(self, active_files: list[DagFileInfo]) -> None:
- """Deactivate DAGs that come from files that are no longer present."""
+ def deactivate_deleted_dags(self, bundle_name: str, present:
set[DagFileInfo]) -> None:
+ """Deactivate DAGs that come from files that are no longer present in
bundle."""
def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
"""
@@ -549,37 +548,31 @@ class DagFileProcessorManager(LoggingMixin):
except zipfile.BadZipFile:
self.log.exception("There was an error accessing ZIP file %s
%s", abs_path)
- present: set[tuple[str, str]] = set()
- """
- Tuple containing bundle name and relative fileloc of the dag file.
-
- If the dag file is embedded in a zip file, the relative fileloc will
be the
- zip file path (relative to bundle path) joined with the path to the
dag file (relative
- to the zip file path).
- """
-
- for info in active_files:
+ rel_filelocs: list[str] = []
+ for info in present:
abs_path = str(info.absolute_path)
if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
- present.add((info.bundle_name, str(info.rel_path)))
+ rel_filelocs.append(str(info.rel_path))
else:
if TYPE_CHECKING:
assert info.bundle_path
for abs_sub_path in
find_zipped_dags(abs_path=info.absolute_path):
rel_sub_path =
Path(abs_sub_path).relative_to(info.bundle_path)
- present.add((info.bundle_name, str(rel_sub_path)))
+ rel_filelocs.append(str(rel_sub_path))
- DagModel.deactivate_deleted_dags(present)
+ DagModel.deactivate_deleted_dags(bundle_name=bundle_name,
rel_filelocs=rel_filelocs)
- def _print_stat(self):
+ def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
"""Occasionally print out stats about how fast the files are getting
processed."""
if 0 < self.print_stats_interval < time.monotonic() -
self.last_stat_print_time:
- if self._files:
- self._log_file_processing_stats(self._files)
+ if known_files:
+ self._log_file_processing_stats(known_files=known_files)
self.last_stat_print_time = time.monotonic()
@provide_session
- def clear_nonexistent_import_errors(self, session=NEW_SESSION):
+ def clear_orphaned_import_errors(
+ self, bundle_name: str, observed_filelocs: set[str], session: Session
= NEW_SESSION
+ ):
"""
Clear import errors for files that no longer exist.
@@ -587,22 +580,18 @@ class DagFileProcessorManager(LoggingMixin):
"""
self.log.debug("Removing old import errors")
try:
- query = delete(ParseImportError)
-
- if self._files:
- query = query.where(
- tuple_(ParseImportError.filename,
ParseImportError.bundle_name).notin_(
- # todo AIP-66: ParseImportError should have rel
fileloce + bundle name
- [(str(f.absolute_path), f.bundle_name) for f in
self._files]
- ),
- )
-
-
session.execute(query.execution_options(synchronize_session="fetch"))
- session.commit()
+ errors = session.scalars(
+ select(ParseImportError)
+ .where(ParseImportError.bundle_name == bundle_name)
+ .options(load_only(ParseImportError.filename))
+ )
+ for error in errors:
+ if error.filename not in observed_filelocs:
+ session.delete(error)
except Exception:
self.log.exception("Error removing old import errors")
- def _log_file_processing_stats(self, known_files):
+ def _log_file_processing_stats(self, known_files: dict[str,
set[DagFileInfo]]):
"""
Print out stats about how files are getting processed.
@@ -634,32 +623,34 @@ class DagFileProcessorManager(LoggingMixin):
rows = []
utcnow = timezone.utcnow()
now = time.monotonic()
- for file in known_files:
- stat = self._file_stats[file]
- proc = self._processors.get(file)
- num_dags = stat.num_dags
- num_errors = stat.import_errors
- file_name = Path(file.rel_path).stem
- processor_pid = proc.pid if proc else None
- processor_start_time = proc.start_time if proc else None
- runtime = (now - processor_start_time) if processor_start_time
else None
- last_run = stat.last_finish_time
- if last_run:
- seconds_ago = (utcnow - last_run).total_seconds()
-
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
-
- rows.append(
- (
- file.bundle_name,
- file.rel_path,
- processor_pid,
- runtime,
- num_dags,
- num_errors,
- stat.last_duration,
- last_run,
+
+ for files in known_files.values():
+ for file in files:
+ stat = self._file_stats[file]
+ proc = self._processors.get(file)
+ num_dags = stat.num_dags
+ num_errors = stat.import_errors
+ file_name = Path(file.rel_path).stem
+ processor_pid = proc.pid if proc else None
+ processor_start_time = proc.start_time if proc else None
+ runtime = (now - processor_start_time) if processor_start_time
else None
+ last_run = stat.last_finish_time
+ if last_run:
+ seconds_ago = (utcnow - last_run).total_seconds()
+
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
+
+ rows.append(
+ (
+ file.bundle_name,
+ file.rel_path,
+ processor_pid,
+ runtime,
+ num_dags,
+ num_errors,
+ stat.last_duration,
+ last_run,
+ )
)
- )
# Sort by longest last runtime. (Can't sort None values in python3)
rows.sort(key=lambda x: x[5] or 0.0, reverse=True)
@@ -699,34 +690,51 @@ class DagFileProcessorManager(LoggingMixin):
self.log.info(log_str)
- def set_files(self, files: list[DagFileInfo]):
+ def handle_removed_files(self, known_files: dict[str, set[DagFileInfo]]):
"""
- Update the set of files to track in the dag processor.
+ Remove from data structures the files that are missing.
+
+ Also, terminate processes that may be running on those removed files.
- :param files: list of files
+ :param known_files: structure containing known files per-bundle
:return: None
"""
- self._files = files
+ files_set: set[DagFileInfo] = set()
+ """Set containing all observed files.
+
+ We consolidate to one set for performance.
+ """
+
+ for v in known_files.values():
+ files_set |= v
+ self.purge_removed_files_from_queue(present=files_set)
+ self.terminate_orphan_processes(present=files_set)
+ self.remove_orphaned_file_stats(present=files_set)
+
+ def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+ """Remove from queue any files no longer observed locally."""
+ self._file_queue = deque(x for x in self._file_queue if x in present)
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
- # Stop processors that are working on deleted files
- filtered_processors = {}
- for file, processor in self._processors.items():
- if file in files:
- filtered_processors[file] = processor
- else:
+ def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
+ """Remove the stats for any dag files that don't exist anymore."""
+ # todo: store stats by bundle also?
+ stats_to_remove = set(self._file_stats).difference(present)
+ for file in stats_to_remove:
+ del self._file_stats[file]
+
+ def terminate_orphan_processes(self, present: set[DagFileInfo]):
+ """Stop processors that are working on deleted files."""
+ for file in list(self._processors.keys()):
+ if file not in present:
+ processor = self._processors.pop(file, None)
+ if not processor:
+ continue
self.log.warning("Stopping processor for %s", file)
Stats.decr("dag_processing.processes", tags={"file_path":
file, "action": "stop"})
processor.kill(signal.SIGKILL)
- self._file_stats.pop(file)
-
- to_remove = set(self._file_stats).difference(self._files)
- for file in to_remove:
- # Remove the stats for any dag files that don't exist anymore
- del self._file_stats[file]
-
- self._processors = filtered_processors
+ self._file_stats.pop(file, None)
@provide_session
def _collect_results(self, session: Session = NEW_SESSION):
@@ -825,14 +833,44 @@ class DagFileProcessorManager(LoggingMixin):
self._processors[file] = processor
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
- def add_files_to_queue(self):
- for file in self._files:
- if file not in self._file_stats:
- # We found new file after refreshing dir. add to parsing queue
at start
- self.log.info("Adding new file %s to parsing queue", file)
- self._file_queue.appendleft(file)
+ def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
+ for files in known_files.values():
+ for file in files:
+ if file not in self._file_stats: # todo: store stats by
bundle also?
+ # We found new file after refreshing dir. add to parsing
queue at start
+ self.log.info("Adding new file %s to parsing queue", file)
+ self._file_queue.appendleft(file)
+
+ def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
+ files_with_mtime: dict[DagFileInfo, float] = {}
+ changed_recently = set()
+ for file in files:
+ try:
+ modified_timestamp = os.path.getmtime(file.absolute_path)
+ modified_datetime = datetime.fromtimestamp(modified_timestamp,
tz=timezone.utc)
+ files_with_mtime[file] = modified_timestamp
+ last_time = self._file_stats[file].last_finish_time
+ if not last_time:
+ continue
+ if modified_datetime > last_time:
+ changed_recently.add(file)
+ except FileNotFoundError:
+ self.log.warning("Skipping processing of missing file: %s",
file)
+ self._file_stats.pop(file, None)
+ continue
+ file_infos = [info for info, ts in sorted(files_with_mtime.items(),
key=itemgetter(1), reverse=True)]
+ return file_infos, changed_recently
+
+ def processed_recently(self, now, file):
+ last_time = self._file_stats[file].last_finish_time
+ if not last_time:
+ return False
+ elapsed_ss = (now - last_time).total_seconds()
+ if elapsed_ss < self._file_process_interval:
+ return True
+ return False
- def prepare_file_queue(self):
+ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]):
"""
Scan dags dir to generate more file paths to process.
@@ -846,63 +884,34 @@ class DagFileProcessorManager(LoggingMixin):
# Sort the file paths by the parsing order mode
list_mode = conf.get("dag_processor", "file_parsing_sort_mode")
-
- files_with_mtime: dict[DagFileInfo, datetime] = {}
- file_infos: list[DagFileInfo] = []
- is_mtime_mode = list_mode == "modified_time"
-
- recently_processed: list[DagFileInfo] = []
- to_stop = set()
- for file in self._files:
- if is_mtime_mode:
- try:
- files_with_mtime[file] =
os.path.getmtime(file.absolute_path)
- except FileNotFoundError:
- self.log.warning("Skipping processing of missing file:
%s", file)
- self._file_stats.pop(file, None)
- to_stop.add(file)
- continue
- file_modified_time =
datetime.fromtimestamp(files_with_mtime[file], tz=timezone.utc)
- else:
- file_infos.append(file)
- file_modified_time = None
-
- # Find file paths that were recently processed to exclude them
- # from being added to file_queue
- # unless they were modified recently and parsing mode is
"modified_time"
- # in which case we don't honor "self._file_process_interval"
(min_file_process_interval)
- if (
- (last_finish_time := self._file_stats[file].last_finish_time)
is not None
- and (now - last_finish_time).total_seconds() <
self._file_process_interval
- and not (is_mtime_mode and file_modified_time and
(file_modified_time > last_finish_time))
- ):
- recently_processed.append(file)
-
- # Sort file paths via last modified time
- if is_mtime_mode:
- file_infos = [
- info for info, ts in sorted(files_with_mtime.items(),
key=itemgetter(1), reverse=True)
- ]
+ recently_processed = set()
+ files = []
+
+ for bundle_files in known_files.values():
+ for file in bundle_files:
+ files.append(file)
+ if self.processed_recently(now, file):
+ recently_processed.add(file)
+
+ changed_recently: set[DagFileInfo] = set()
+ if list_mode == "modified_time":
+ files, changed_recently = self._sort_by_mtime(files=files)
elif list_mode == "alphabetical":
- file_infos.sort(key=attrgetter("rel_path"))
+ files.sort(key=attrgetter("rel_path"))
elif list_mode == "random_seeded_by_host":
# Shuffle the list seeded by hostname so multiple DAG processors
can work on different
# set of files. Since we set the seed, the sort order will remain
same per host
- random.Random(get_hostname()).shuffle(file_infos)
-
- if to_stop:
- self.set_files([x for x in self._files if x not in to_stop])
+ random.Random(get_hostname()).shuffle(files)
at_run_limit = [info for info, stat in self._file_stats.items() if
stat.run_count == self.max_runs]
+ to_exclude = in_progress.union(at_run_limit)
- to_exclude = in_progress.union(
- recently_processed,
- at_run_limit,
- )
+ # exclude recently processed unless changed recently
+ to_exclude |= recently_processed - changed_recently
# Do not convert the following list to set as set does not preserve
the order
# and we need to maintain the order of files for `[dag_processor]
file_parsing_sort_mode`
- to_queue = [x for x in file_infos if x not in to_exclude]
+ to_queue = [x for x in files if x not in to_exclude]
if self.log.isEnabledFor(logging.DEBUG):
for path, processor in self._processors.items():
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e571c016e19..7ebbb48b4b4 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -64,7 +64,7 @@ from sqlalchemy import (
)
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
-from sqlalchemy.orm import backref, relationship
+from sqlalchemy.orm import backref, load_only, relationship
from sqlalchemy.sql import Select, expression
from airflow import settings, utils
@@ -2280,24 +2280,33 @@ class DagModel(Base):
@provide_session
def deactivate_deleted_dags(
cls,
- active: set[tuple[str, str]],
+ bundle_name: str,
+ rel_filelocs: list[str],
session: Session = NEW_SESSION,
) -> None:
"""
Set ``is_active=False`` on the DAGs for which the DAG files have been
removed.
- :param active: tuples (bundle name, relative fileloc) of files that
were observed.
+ :param bundle_name: bundle for filelocs
+ :param rel_filelocs: relative filelocs for bundle
:param session: ORM Session
"""
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s
table ", cls.__tablename__)
dag_models = session.scalars(
- select(cls).where(
- cls.relative_fileloc.is_not(None),
+ select(cls)
+ .where(
+ cls.bundle_name == bundle_name,
+ )
+ .options(
+ load_only(
+ cls.relative_fileloc,
+ cls.is_active,
+ ),
)
)
for dm in dag_models:
- if (dm.bundle_name, dm.relative_fileloc) not in active:
+ if dm.relative_fileloc not in rel_filelocs:
dm.is_active = False
@classmethod
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 46d6fe5f50a..e5cef2e8033 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -22,6 +22,8 @@ import json
import logging
import os
import random
+import re
+import shutil
import signal
import textwrap
import time
@@ -34,7 +36,7 @@ from unittest.mock import MagicMock
import pytest
import time_machine
-from sqlalchemy import func
+from sqlalchemy import func, select
from uuid6 import uuid7
from airflow.callbacks.callback_requests import DagCallbackRequest
@@ -76,10 +78,35 @@ TEST_DAG_FOLDER = Path(__file__).parents[1].resolve() /
"dags"
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
-def _get_dag_file_paths(files: list[str | Path]) -> list[DagFileInfo]:
+def _get_file_infos(files: list[str | Path]) -> list[DagFileInfo]:
return [DagFileInfo(bundle_name="testing", bundle_path=TEST_DAGS_FOLDER,
rel_path=Path(f)) for f in files]
+def mock_get_mtime(file: Path):
+ f = str(file)
+ m = re.match(pattern=r".*ss=(.+?)\.\w+", string=f)
+ if not m:
+ raise ValueError(f"unexpected: {file}")
+ match = m.group(1)
+ if match == "<class 'FileNotFoundError'>":
+ raise FileNotFoundError()
+ try:
+ return int(match)
+ except Exception:
+ raise ValueError(f"could not convert value {match} to int")
+
+
+def encode_mtime_in_filename(val):
+ from pathlib import PurePath
+
+ out = []
+ for fname, mtime in val:
+ f = PurePath(PurePath(fname).name)
+ addition = f"ss={str(mtime)}"
+ out.append(f"{f.stem}-{addition}{f.suffix}")
+ return out
+
+
class TestDagFileProcessorManager:
@pytest.fixture(autouse=True)
def _disable_examples(self):
@@ -190,117 +217,129 @@ class TestDagFileProcessorManager:
assert file_2 in manager._processors.keys()
assert deque([file_3]) == manager._file_queue
- def
test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
+ def
test_handle_removed_files_when_processor_file_path_not_in_new_file_paths(self):
"""Ensure processors and file stats are removed when the file path is
not in the new file paths"""
manager = DagFileProcessorManager(max_runs=1)
+ bundle_name = "testing"
file = DagFileInfo(
- bundle_name="testing", rel_path=Path("missing_file.txt"),
bundle_path=TEST_DAGS_FOLDER
+ bundle_name=bundle_name, rel_path=Path("missing_file.txt"),
bundle_path=TEST_DAGS_FOLDER
)
manager._processors[file] = MagicMock()
manager._file_stats[file] = DagFileStat()
- manager.set_files(["abc.txt"])
+ manager.handle_removed_files({bundle_name: set()})
assert manager._processors == {}
assert file not in manager._file_stats
- def
test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
+ def test_handle_removed_files_when_processor_file_path_is_present(self):
+ """handle_removed_files should not purge files that are still
present."""
manager = DagFileProcessorManager(max_runs=1)
- file = DagFileInfo(bundle_name="testing", rel_path=Path("abc.txt"),
bundle_path=TEST_DAGS_FOLDER)
+ bundle_name = "testing"
+ file = DagFileInfo(bundle_name=bundle_name, rel_path=Path("abc.txt"),
bundle_path=TEST_DAGS_FOLDER)
mock_processor = MagicMock()
manager._processors[file] = mock_processor
- manager.set_files([file])
+ manager.handle_removed_files(known_files={bundle_name: {file}})
assert manager._processors == {file: mock_processor}
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
- def test_file_paths_in_queue_sorted_alphabetically(self):
+ def test_files_in_queue_sorted_alphabetically(self):
"""Test dag files are sorted alphabetically"""
file_names = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
- dag_files = _get_dag_file_paths(file_names)
- ordered_dag_files = _get_dag_file_paths(sorted(file_names))
+ dag_files = _get_file_infos(file_names)
+ ordered_dag_files = _get_file_infos(sorted(file_names))
manager = DagFileProcessorManager(max_runs=1)
-
- manager.set_files(dag_files)
+ known_files = {"some-bundle": set(dag_files)}
assert manager._file_queue == deque()
- manager.prepare_file_queue()
+ manager.prepare_file_queue(known_files=known_files)
assert manager._file_queue == deque(ordered_dag_files)
@conf_vars({("dag_processor", "file_parsing_sort_mode"):
"random_seeded_by_host"})
- def test_file_paths_in_queue_sorted_random_seeded_by_host(self):
+ def test_files_sorted_random_seeded_by_host(self):
"""Test files are randomly sorted and seeded by host name"""
- dag_files = _get_dag_file_paths(["file_3.py", "file_2.py",
"file_4.py", "file_1.py"])
+ f_infos = _get_file_infos(["file_3.py", "file_2.py", "file_4.py",
"file_1.py"])
+ known_files = {"anything": f_infos}
manager = DagFileProcessorManager(max_runs=1)
- manager.set_files(dag_files)
assert manager._file_queue == deque()
- manager.prepare_file_queue()
-
- expected_order = deque(dag_files)
- random.Random(get_hostname()).shuffle(expected_order)
- assert manager._file_queue == expected_order
+ manager.prepare_file_queue(known_files=known_files) # using list over
test for reproducibility
+ random.Random(get_hostname()).shuffle(f_infos)
+ expected = deque(f_infos)
+ assert manager._file_queue == expected
# Verify running it again produces same order
manager._files = []
- manager.prepare_file_queue()
- assert manager._file_queue == expected_order
+ manager.prepare_file_queue(known_files=known_files)
+ assert manager._file_queue == expected
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
- @mock.patch("airflow.utils.file.os.path.getmtime")
- def test_file_paths_in_queue_sorted_by_modified_time(self, mock_getmtime):
+ @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+ def test_files_sorted_by_modified_time(self):
"""Test files are sorted by modified time"""
- paths_with_mtime = {"file_3.py": 3.0, "file_2.py": 2.0, "file_4.py":
5.0, "file_1.py": 4.0}
- dag_files = _get_dag_file_paths(paths_with_mtime.keys())
- mock_getmtime.side_effect = list(paths_with_mtime.values())
+ paths_with_mtime = [
+ ("file_3.py", 3.0),
+ ("file_2.py", 2.0),
+ ("file_4.py", 5.0),
+ ("file_1.py", 4.0),
+ ]
+ filenames = encode_mtime_in_filename(paths_with_mtime)
+ dag_files = _get_file_infos(filenames)
manager = DagFileProcessorManager(max_runs=1)
- manager.set_files(dag_files)
assert manager._file_queue == deque()
- manager.prepare_file_queue()
- ordered_files = _get_dag_file_paths(["file_4.py", "file_1.py",
"file_3.py", "file_2.py"])
+ manager.prepare_file_queue(known_files={"any": set(dag_files)})
+ ordered_files = _get_file_infos(
+ [
+ "file_4-ss=5.0.py",
+ "file_1-ss=4.0.py",
+ "file_3-ss=3.0.py",
+ "file_2-ss=2.0.py",
+ ]
+ )
assert manager._file_queue == deque(ordered_files)
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
- @mock.patch("airflow.utils.file.os.path.getmtime")
- def test_file_paths_in_queue_excludes_missing_file(self, mock_getmtime):
+ @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+ def test_queued_files_exclude_missing_file(self):
"""Check that a file is not enqueued for processing if it has been
deleted"""
- dag_files = _get_dag_file_paths(["file_3.py", "file_2.py",
"file_4.py"])
- mock_getmtime.side_effect = [1.0, 2.0, FileNotFoundError()]
-
+ file_and_mtime = [("file_3.py", 2.0), ("file_2.py", 3.0),
("file_4.py", FileNotFoundError)]
+ filenames = encode_mtime_in_filename(file_and_mtime)
+ file_infos = _get_file_infos(filenames)
manager = DagFileProcessorManager(max_runs=1)
-
- manager.set_files(dag_files)
- manager.prepare_file_queue()
-
- ordered_files = _get_dag_file_paths(["file_2.py", "file_3.py"])
+ manager.prepare_file_queue(known_files={"any": set(file_infos)})
+ ordered_files = _get_file_infos(["file_2-ss=3.0.py",
"file_3-ss=2.0.py"])
assert manager._file_queue == deque(ordered_files)
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
- @mock.patch("airflow.utils.file.os.path.getmtime")
- def test_add_new_file_to_parsing_queue(self, mock_getmtime):
+ @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+ def test_add_new_file_to_parsing_queue(self):
"""Check that new file is added to parsing queue"""
- dag_files = _get_dag_file_paths(["file_1.py", "file_2.py",
"file_3.py"])
- mock_getmtime.side_effect = [1.0, 2.0, 3.0]
+ dag_files = _get_file_infos(["file_1-ss=2.0.py", "file_2-ss=3.0.py",
"file_3-ss=4.0.py"])
+ from random import Random
+ Random("file_2.py").random()
manager = DagFileProcessorManager(max_runs=1)
- manager.set_files(dag_files)
- manager.prepare_file_queue()
- ordered_files = _get_dag_file_paths(["file_3.py", "file_2.py",
"file_1.py"])
- assert manager._file_queue == deque(ordered_files)
+ manager.prepare_file_queue(known_files={"any": set(dag_files)})
+ assert set(manager._file_queue) == set(dag_files)
- manager.set_files(
+ manager.prepare_file_queue(
+ known_files={"any": set((*dag_files,
*_get_file_infos(["file_4-ss=1.0.py"])))}
+ )
+ # manager.add_files_to_queue()
+ ordered_files = _get_file_infos(
[
- *dag_files,
- DagFileInfo(bundle_name="testing", rel_path=Path("file_4.py"),
bundle_path=TEST_DAGS_FOLDER),
+ "file_3-ss=4.0.py",
+ "file_2-ss=3.0.py",
+ "file_1-ss=2.0.py",
+ "file_4-ss=1.0.py",
]
)
- manager.add_files_to_queue()
- ordered_files = _get_dag_file_paths(["file_4.py", "file_3.py",
"file_2.py", "file_1.py"])
assert manager._file_queue == deque(ordered_files)
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@@ -314,7 +353,7 @@ class TestDagFileProcessorManager:
dag_file = DagFileInfo(
bundle_name="testing", rel_path=Path("file_1.py"),
bundle_path=TEST_DAGS_FOLDER
)
- dag_files = [dag_file]
+ known_files = {"does-not-matter": {dag_file}}
mock_getmtime.side_effect = [initial_file_1_mtime]
manager = DagFileProcessorManager(max_runs=3)
@@ -325,10 +364,9 @@ class TestDagFileProcessorManager:
dag_file: DagFileStat(1, 0, last_finish_time, 1.0, 1, 1),
}
with time_machine.travel(freezed_base_time):
- manager.set_files(dag_files)
assert manager._file_queue == deque()
# File Path Queue will be empty as the "modified time" < "last
finish time"
- manager.prepare_file_queue()
+ manager.prepare_file_queue(known_files=known_files)
assert manager._file_queue == deque()
# Simulate the DAG modification by using modified_time which is greater
@@ -336,13 +374,12 @@ class TestDagFileProcessorManager:
file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
file_1_new_mtime_ts = file_1_new_mtime.timestamp()
with time_machine.travel(freezed_base_time):
- manager.set_files(dag_files)
assert manager._file_queue == deque()
# File Path Queue will be empty as the "modified time" < "last
finish time"
mock_getmtime.side_effect = [file_1_new_mtime_ts]
- manager.prepare_file_queue()
+ manager.prepare_file_queue(known_files=known_files)
# Check that file is added to the queue even though file was just
recently passed
- assert manager._file_queue == deque(dag_files)
+ assert manager._file_queue == deque([dag_file])
assert last_finish_time < file_1_new_mtime
assert (
manager._file_process_interval
@@ -363,7 +400,7 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(dag_directory="directory",
max_runs=1)
- manager.set_files(dag_files)
+ manager.handle_removed_files(dag_files)
manager._file_queue = deque(["file_2.py", "file_3.py", "file_4.py",
"file_1.py"])
manager._refresh_requested_filelocs()
assert manager._file_queue == deque(["file_1.py", "file_2.py",
"file_3.py", "file_4.py"])
@@ -608,32 +645,37 @@ class TestDagFileProcessorManager:
assert dag.get_is_active()
@pytest.mark.usefixtures("testing_dag_bundle")
- def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path,
configure_testing_dag_bundle):
+ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(
+ self, session, tmp_path, configure_testing_dag_bundle
+ ):
"""Test DagFileProcessorManager._refresh_dag_dir method"""
- dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
- zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
- dagbag.process_file(zipped_dag_path)
- dag = dagbag.get_dag("test_zip_dag")
- dag.sync_to_db()
- SerializedDagModel.write_dag(dag, bundle_name="testing")
+ dag_id = "test_zip_dag"
+ filename = "test_zip.zip"
+ source_location = os.path.join(TEST_DAGS_FOLDER, filename)
+ bundle_path = Path(tmp_path,
"test_refresh_dags_dir_deactivates_deleted_zipped_dags")
+ bundle_path.mkdir(exist_ok=True)
+ zip_dag_path = bundle_path / filename
+ shutil.copy(source_location, zip_dag_path)
+
+ with configure_testing_dag_bundle(bundle_path):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.run()
- # TODO: this test feels a bit fragile - pointing at the zip directly
causes the test to fail
- # TODO: jed look at this more closely - bagbad then process_file?!
+ assert SerializedDagModel.has_dag(dag_id)
+ assert DagCode.has_dag(dag_id)
+ assert DagVersion.get_latest_version(dag_id)
+ dag = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))
+ assert dag.is_active is True
- # Mock might_contain_dag to mimic deleting the python file from the zip
- with mock.patch("airflow.dag_processing.manager.might_contain_dag",
return_value=False):
- with configure_testing_dag_bundle(TEST_DAGS_FOLDER):
- manager = DagFileProcessorManager(max_runs=1)
- manager.run()
+ os.remove(zip_dag_path)
- # Deleting the python file should not delete SDM for versioning sake
- assert SerializedDagModel.has_dag("test_zip_dag")
- # assert code not deleted for versioning sake
- assert DagCode.has_dag(dag.dag_id)
- # assert dagversion was not deleted
- assert DagVersion.get_latest_version(dag.dag_id)
- # assert dag deactivated
- assert not dag.get_is_active()
+ manager.run()
+
+ assert SerializedDagModel.has_dag(dag_id)
+ assert DagCode.has_dag(dag_id)
+ assert DagVersion.get_latest_version(dag_id)
+ dag = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))
+ assert dag.is_active is False
def test_deactivate_deleted_dags(self, dag_maker):
with dag_maker("test_dag1") as dag1:
@@ -643,12 +685,16 @@ class TestDagFileProcessorManager:
dag_maker.sync_dagbag_to_db()
active_files = [
- DagFileInfo(bundle_name="dag_maker",
rel_path=Path("test_dag1.py"), bundle_path=TEST_DAGS_FOLDER),
+ DagFileInfo(
+ bundle_name="dag_maker",
+ rel_path=Path("test_dag1.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ ),
# Mimic that the test_dag2.py file is deleted
]
manager = DagFileProcessorManager(max_runs=1)
- manager.deactivate_deleted_dags(active_files=active_files)
+ manager.deactivate_deleted_dags("dag_maker", active_files)
dagbag = DagBag(read_dags_from_db=True)
# The DAG from test_dag1.py is still active
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 84801d6cfb5..af815279737 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1090,7 +1090,10 @@ class TestDag:
assert orm_dag.is_active
-
DagModel.deactivate_deleted_dags(list_py_file_paths(settings.DAGS_FOLDER))
+ DagModel.deactivate_deleted_dags(
+ bundle_name=orm_dag.bundle_name,
+ rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER),
+ )
orm_dag = session.query(DagModel).filter(DagModel.dag_id ==
dag_id).one()
assert not orm_dag.is_active