This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8badad15411 Move find_path_from_directory to shared module_loading
library (#60114)
8badad15411 is described below
commit 8badad1541132d50a8b716c3f9715f0b620da72f
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jan 5 14:16:48 2026 +0530
Move find_path_from_directory to shared module_loading library (#60114)
---
airflow-core/src/airflow/plugins_manager.py | 13 +-
airflow-core/src/airflow/utils/file.py | 193 +++------------------
.../tests/unit/plugins/test_plugin_ignore.py | 2 +-
airflow-core/tests/unit/utils/test_file.py | 115 +-----------
shared/module_loading/pyproject.toml | 1 +
.../src/airflow_shared/module_loading/__init__.py | 4 +
.../module_loading/file_discovery.py | 168 +-----------------
.../tests/module_loading/test_file_discovery.py | 139 +++++++++++++++
task-sdk/pyproject.toml | 1 +
9 files changed, 184 insertions(+), 452 deletions(-)
diff --git a/airflow-core/src/airflow/plugins_manager.py
b/airflow-core/src/airflow/plugins_manager.py
index 40cfe9bd25d..fd4b538f36f 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -32,13 +32,17 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any
from airflow import settings
-from airflow._shared.module_loading import entry_points_with_dist,
import_string, qualname
+from airflow._shared.module_loading import (
+ entry_points_with_dist,
+ find_path_from_directory,
+ import_string,
+ qualname,
+)
from airflow.configuration import conf
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
)
-from airflow.utils.file import find_path_from_directory
if TYPE_CHECKING:
from airflow.lineage.hook import HookLineageReader
@@ -205,7 +209,8 @@ def _load_plugins_from_plugin_directory() ->
tuple[list[AirflowPlugin], dict[str
if settings.PLUGINS_FOLDER is None:
raise ValueError("Plugins folder is not set")
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
- files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
+ ignore_file_syntax = conf.get_mandatory_value("core",
"DAG_IGNORE_FILE_SYNTAX", fallback="glob")
+ files = find_path_from_directory(settings.PLUGINS_FOLDER,
".airflowignore", ignore_file_syntax)
plugin_search_locations: list[tuple[str, Generator[str, None, None]]] =
[("", files)]
if conf.getboolean("core", "LOAD_EXAMPLES"):
@@ -213,7 +218,7 @@ def _load_plugins_from_plugin_directory() ->
tuple[list[AirflowPlugin], dict[str
from airflow.example_dags import plugins as example_plugins
example_plugins_folder = next(iter(example_plugins.__path__))
- example_files = find_path_from_directory(example_plugins_folder,
".airflowignore")
+ example_files = find_path_from_directory(example_plugins_folder,
".airflowignore", ignore_file_syntax)
plugin_search_locations.append((example_plugins.__name__,
example_files))
plugins: list[AirflowPlugin] = []
diff --git a/airflow-core/src/airflow/utils/file.py
b/airflow-core/src/airflow/utils/file.py
index 76cdf846fe0..c614cfff0ad 100644
--- a/airflow-core/src/airflow/utils/file.py
+++ b/airflow-core/src/airflow/utils/file.py
@@ -26,10 +26,7 @@ import zipfile
from collections.abc import Generator
from io import TextIOWrapper
from pathlib import Path
-from re import Pattern
-from typing import NamedTuple, Protocol, overload
-
-from pathspec.patterns import GitWildMatchPattern
+from typing import overload
from airflow.configuration import conf
@@ -38,93 +35,6 @@ log = logging.getLogger(__name__)
MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
-class _IgnoreRule(Protocol):
- """Interface for ignore rules for structural subtyping."""
-
- @staticmethod
- def compile(pattern: str, base_dir: Path, definition_file: Path) ->
_IgnoreRule | None:
- """
- Build an ignore rule from the supplied pattern.
-
- ``base_dir`` and ``definition_file`` should be absolute paths.
- """
-
- @staticmethod
- def match(path: Path, rules: list[_IgnoreRule]) -> bool:
- """Match a candidate absolute path against a list of rules."""
-
-
-class _RegexpIgnoreRule(NamedTuple):
- """Typed namedtuple with utility functions for regexp ignore rules."""
-
- pattern: Pattern
- base_dir: Path
-
- @staticmethod
- def compile(pattern: str, base_dir: Path, definition_file: Path) ->
_IgnoreRule | None:
- """Build an ignore rule from the supplied regexp pattern and log a
useful warning if it is invalid."""
- try:
- return _RegexpIgnoreRule(re.compile(pattern), base_dir)
- except re.error as e:
- log.warning("Ignoring invalid regex '%s' from %s: %s", pattern,
definition_file, e)
- return None
-
- @staticmethod
- def match(path: Path, rules: list[_IgnoreRule]) -> bool:
- """Match a list of ignore rules against the supplied path."""
- for rule in rules:
- if not isinstance(rule, _RegexpIgnoreRule):
- raise ValueError(f"_RegexpIgnoreRule cannot match rules of
type: {type(rule)}")
- if rule.pattern.search(str(path.relative_to(rule.base_dir))) is
not None:
- return True
- return False
-
-
-class _GlobIgnoreRule(NamedTuple):
- """Typed namedtuple with utility functions for glob ignore rules."""
-
- wild_match_pattern: GitWildMatchPattern
- relative_to: Path | None = None
-
- @staticmethod
- def compile(pattern: str, base_dir: Path, definition_file: Path) ->
_IgnoreRule | None:
- """Build an ignore rule from the supplied glob pattern and log a
useful warning if it is invalid."""
- relative_to: Path | None = None
- if pattern.strip() == "/":
- # "/" doesn't match anything in gitignore
- log.warning("Ignoring no-op glob pattern '/' from %s",
definition_file)
- return None
- if pattern.startswith("/") or "/" in pattern.rstrip("/"):
- # See https://git-scm.com/docs/gitignore
- # > If there is a separator at the beginning or middle (or both)
of the pattern, then the
- # > pattern is relative to the directory level of the particular
.gitignore file itself.
- # > Otherwise the pattern may also match at any level below the
.gitignore level.
- relative_to = definition_file.parent
-
- ignore_pattern = GitWildMatchPattern(pattern)
- return _GlobIgnoreRule(wild_match_pattern=ignore_pattern,
relative_to=relative_to)
-
- @staticmethod
- def match(path: Path, rules: list[_IgnoreRule]) -> bool:
- """Match a list of ignore rules against the supplied path, accounting
for exclusion rules and ordering."""
- matched = False
- for rule in rules:
- if not isinstance(rule, _GlobIgnoreRule):
- raise ValueError(f"_GlobIgnoreRule cannot match rules of type:
{type(rule)}")
- rel_obj = path.relative_to(rule.relative_to) if rule.relative_to
else Path(path.name)
- if path.is_dir():
- rel_path = f"{rel_obj.as_posix()}/"
- else:
- rel_path = rel_obj.as_posix()
- if (
- rule.wild_match_pattern.include is not None
- and rule.wild_match_pattern.match_file(rel_path) is not None
- ):
- matched = rule.wild_match_pattern.include
-
- return matched
-
-
ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
@@ -164,84 +74,6 @@ def open_maybe_zipped(fileloc, mode="r"):
return open(fileloc, mode=mode)
-def _find_path_from_directory(
- base_dir_path: str | os.PathLike[str],
- ignore_file_name: str,
- ignore_rule_type: type[_IgnoreRule],
-) -> Generator[str, None, None]:
- """
- Recursively search the base path and return the list of file paths that
should not be ignored.
-
- :param base_dir_path: the base path to be searched
- :param ignore_file_name: the file name containing regular expressions for
files that should be ignored.
- :param ignore_rule_type: the concrete class for ignore rules, which
implements the _IgnoreRule interface.
-
- :return: a generator of file paths which should not be ignored.
- """
- # A Dict of patterns, keyed using resolved, absolute paths
- patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
-
- for root, dirs, files in os.walk(base_dir_path, followlinks=True):
- patterns: list[_IgnoreRule] =
patterns_by_dir.get(Path(root).resolve(), [])
-
- ignore_file_path = Path(root) / ignore_file_name
- if ignore_file_path.is_file():
- with open(ignore_file_path) as ifile:
- patterns_to_match_excluding_comments = [
- re.sub(r"\s*#.*", "", line) for line in
ifile.read().split("\n")
- ]
- # append new patterns and filter out "None" objects, which are
invalid patterns
- patterns += [
- p
- for p in [
- ignore_rule_type.compile(pattern, Path(base_dir_path),
ignore_file_path)
- for pattern in patterns_to_match_excluding_comments
- if pattern
- ]
- if p is not None
- ]
- # evaluation order of patterns is important with negation
- # so that later patterns can override earlier patterns
-
- dirs[:] = [subdir for subdir in dirs if not
ignore_rule_type.match(Path(root) / subdir, patterns)]
- # explicit loop for infinite recursion detection since we are
following symlinks in this walk
- for sd in dirs:
- dirpath = (Path(root) / sd).resolve()
- if dirpath in patterns_by_dir:
- raise RuntimeError(
- "Detected recursive loop when walking DAG directory "
- f"{base_dir_path}: {dirpath} has appeared more than once."
- )
- patterns_by_dir.update({dirpath: patterns.copy()})
-
- for file in files:
- if file != ignore_file_name:
- abs_file_path = Path(root) / file
- if not ignore_rule_type.match(abs_file_path, patterns):
- yield str(abs_file_path)
-
-
-def find_path_from_directory(
- base_dir_path: str | os.PathLike[str],
- ignore_file_name: str,
- ignore_file_syntax: str = conf.get_mandatory_value("core",
"DAG_IGNORE_FILE_SYNTAX", fallback="glob"),
-) -> Generator[str, None, None]:
- """
- Recursively search the base path for a list of file paths that should not
be ignored.
-
- :param base_dir_path: the base path to be searched
- :param ignore_file_name: the file name in which specifies the patterns of
files/dirs to be ignored
- :param ignore_file_syntax: the syntax of patterns in the ignore file:
regexp or glob
-
- :return: a generator of file paths.
- """
- if ignore_file_syntax == "glob" or not ignore_file_syntax:
- return _find_path_from_directory(base_dir_path, ignore_file_name,
_GlobIgnoreRule)
- if ignore_file_syntax == "regexp":
- return _find_path_from_directory(base_dir_path, ignore_file_name,
_RegexpIgnoreRule)
- raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
-
-
def list_py_file_paths(
directory: str | os.PathLike[str] | None,
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE",
fallback=True),
@@ -268,9 +100,12 @@ def list_py_file_paths(
def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) ->
list[str]:
"""Find file paths of all DAG files."""
+ from airflow._shared.module_loading.file_discovery import
find_path_from_directory
+
file_paths = []
+ ignore_file_syntax = conf.get_mandatory_value("core",
"DAG_IGNORE_FILE_SYNTAX", fallback="glob")
- for file_path in find_path_from_directory(directory, ".airflowignore"):
+ for file_path in find_path_from_directory(directory, ".airflowignore",
ignore_file_syntax):
path = Path(file_path)
try:
if path.is_file() and (path.suffix == ".py" or
zipfile.is_zipfile(path)):
@@ -353,3 +188,21 @@ def get_unique_dag_module_name(file_path: str) -> str:
org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem)
return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash,
module_name=org_mod_name)
raise ValueError("file_path should be a string to generate unique module
name")
+
+
+def __getattr__(name: str):
+ if name == "find_path_from_directory":
+ import warnings
+
+ from airflow._shared.module_loading import find_path_from_directory
+ from airflow.utils.deprecation_tools import DeprecatedImportWarning
+
+ warnings.warn(
+ "Importing find_path_from_directory from airflow.utils.file is
deprecated "
+ "and will be removed in a future version. "
+ "Use airflow._shared.module_loading.find_path_from_directory
instead.",
+ DeprecatedImportWarning,
+ stacklevel=2,
+ )
+ return find_path_from_directory
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/airflow-core/tests/unit/plugins/test_plugin_ignore.py
b/airflow-core/tests/unit/plugins/test_plugin_ignore.py
index 671cb71dacc..68e56abd7d3 100644
--- a/airflow-core/tests/unit/plugins/test_plugin_ignore.py
+++ b/airflow-core/tests/unit/plugins/test_plugin_ignore.py
@@ -21,7 +21,7 @@ from pathlib import Path
from unittest.mock import patch
from airflow import settings
-from airflow.utils.file import find_path_from_directory
+from airflow._shared.module_loading import find_path_from_directory
def populate_dir(root_path):
diff --git a/airflow-core/tests/unit/utils/test_file.py
b/airflow-core/tests/unit/utils/test_file.py
index fc46a3a6560..cc55c1ac063 100644
--- a/airflow-core/tests/unit/utils/test_file.py
+++ b/airflow-core/tests/unit/utils/test_file.py
@@ -19,16 +19,15 @@ from __future__ import annotations
import os
import zipfile
-from pathlib import Path
from pprint import pformat
from unittest import mock
import pytest
+from airflow._shared.module_loading import find_path_from_directory
from airflow.utils import file as file_utils
from airflow.utils.file import (
correct_maybe_zipped,
- find_path_from_directory,
list_py_file_paths,
open_maybe_zipped,
)
@@ -97,23 +96,6 @@ class TestOpenMaybeZipped:
class TestListPyFilesPath:
- @pytest.fixture
- def test_dir(self, tmp_path):
- # create test tree with symlinks
- source = os.path.join(tmp_path, "folder")
- target = os.path.join(tmp_path, "symlink")
- py_file = os.path.join(source, "hello_world.py")
- ignore_file = os.path.join(tmp_path, ".airflowignore")
- os.mkdir(source)
- os.symlink(source, target)
- # write ignore files
- with open(ignore_file, "w") as f:
- f.write("folder")
- # write sample pyfile
- with open(py_file, "w") as f:
- f.write("print('hello world')")
- return tmp_path
-
def test_find_path_from_directory_regex_ignore(self):
should_ignore = [
"test_invalid_cron.py",
@@ -154,106 +136,11 @@ class TestListPyFilesPath:
f"actual_included_filenames:
{pformat(actual_included_filenames)}\nexpected_included_filenames:
{pformat(should_not_ignore)}"
)
- def test_find_path_from_directory_respects_symlinks_regexp_ignore(self,
test_dir):
- ignore_list_file = ".airflowignore"
- found = list(find_path_from_directory(test_dir, ignore_list_file))
-
- assert os.path.join(test_dir, "symlink", "hello_world.py") in found
- assert os.path.join(test_dir, "folder", "hello_world.py") not in found
-
- def test_find_path_from_directory_respects_symlinks_glob_ignore(self,
test_dir):
- ignore_list_file = ".airflowignore"
- found = list(find_path_from_directory(test_dir, ignore_list_file,
ignore_file_syntax="glob"))
-
- assert os.path.join(test_dir, "symlink", "hello_world.py") in found
- assert os.path.join(test_dir, "folder", "hello_world.py") not in found
-
- def test_find_path_from_directory_fails_on_recursive_link(self, test_dir):
- # add a recursive link
- recursing_src = os.path.join(test_dir, "folder2", "recursor")
- recursing_tgt = os.path.join(test_dir, "folder2")
- os.mkdir(recursing_tgt)
- os.symlink(recursing_tgt, recursing_src)
-
- ignore_list_file = ".airflowignore"
-
- error_message = (
- f"Detected recursive loop when walking DAG directory {test_dir}: "
- f"{Path(recursing_tgt).resolve()} has appeared more than once."
- )
- with pytest.raises(RuntimeError, match=error_message):
- list(find_path_from_directory(test_dir, ignore_list_file,
ignore_file_syntax="glob"))
-
def test_might_contain_dag_with_default_callable(self):
file_path_with_dag = os.path.join(TEST_DAGS_FOLDER,
"test_scheduler_dags.py")
assert file_utils.might_contain_dag(file_path=file_path_with_dag,
safe_mode=True)
- def test_airflowignore_negation_unignore_subfolder_file_glob(self,
tmp_path):
- """Ensure negation rules can unignore a subfolder and a file inside it
when using glob syntax.
-
- Patterns:
- * -> ignore everything
- !subfolder/ -> unignore the subfolder (must match
directory rule)
- !subfolder/keep.py -> unignore a specific file inside the
subfolder
- """
- dags_root = tmp_path / "dags"
- (dags_root / "subfolder").mkdir(parents=True)
- # files
- (dags_root / "drop.py").write_text("raise Exception('ignored')\n")
- (dags_root / "subfolder" / "keep.py").write_text("# should be
discovered\n")
- (dags_root / "subfolder" / "drop.py").write_text("raise
Exception('ignored')\n")
-
- (dags_root / ".airflowignore").write_text(
- "\n".join(
- [
- "*",
- "!subfolder/",
- "!subfolder/keep.py",
- ]
- )
- )
-
- detected = set()
- for raw in find_path_from_directory(dags_root, ".airflowignore",
"glob"):
- p = Path(raw)
- if p.is_file() and p.suffix == ".py":
- detected.add(p.relative_to(dags_root).as_posix())
-
- assert detected == {"subfolder/keep.py"}
-
- def test_airflowignore_negation_nested_with_globstar(self, tmp_path):
- """Negation with ** should work for nested subfolders."""
- dags_root = tmp_path / "dags"
- nested = dags_root / "a" / "b" / "subfolder"
- nested.mkdir(parents=True)
-
- # files
- (dags_root / "ignore_top.py").write_text("raise
Exception('ignored')\n")
- (nested / "keep.py").write_text("# should be discovered\n")
- (nested / "drop.py").write_text("raise Exception('ignored')\n")
-
- (dags_root / ".airflowignore").write_text(
- "\n".join(
- [
- "*",
- "!a/",
- "!a/b/",
- "!**/subfolder/",
- "!**/subfolder/keep.py",
- "drop.py",
- ]
- )
- )
-
- detected = set()
- for raw in find_path_from_directory(dags_root, ".airflowignore",
"glob"):
- p = Path(raw)
- if p.is_file() and p.suffix == ".py":
- detected.add(p.relative_to(dags_root).as_posix())
-
- assert detected == {"a/b/subfolder/keep.py"}
-
@conf_vars({("core", "might_contain_dag_callable"):
"unit.utils.test_file.might_contain_dag"})
def test_might_contain_dag(self):
"""Test might_contain_dag_callable"""
diff --git a/shared/module_loading/pyproject.toml
b/shared/module_loading/pyproject.toml
index 84a25aa0f57..52fc5e636c0 100644
--- a/shared/module_loading/pyproject.toml
+++ b/shared/module_loading/pyproject.toml
@@ -25,6 +25,7 @@ classifiers = [
dependencies = [
'importlib_metadata>=6.5;python_version<"3.12"',
+ "pathspec>=0.9.0",
]
[dependency-groups]
diff --git
a/shared/module_loading/src/airflow_shared/module_loading/__init__.py
b/shared/module_loading/src/airflow_shared/module_loading/__init__.py
index 07022cbf487..3268dc3ddc2 100644
--- a/shared/module_loading/src/airflow_shared/module_loading/__init__.py
+++ b/shared/module_loading/src/airflow_shared/module_loading/__init__.py
@@ -26,6 +26,10 @@ from collections.abc import Callable, Iterator
from importlib import import_module
from typing import TYPE_CHECKING
+from .file_discovery import (
+ find_path_from_directory as find_path_from_directory,
+)
+
if sys.version_info >= (3, 12):
from importlib import metadata
else:
diff --git a/airflow-core/src/airflow/utils/file.py
b/shared/module_loading/src/airflow_shared/module_loading/file_discovery.py
similarity index 59%
copy from airflow-core/src/airflow/utils/file.py
copy to
shared/module_loading/src/airflow_shared/module_loading/file_discovery.py
index 76cdf846fe0..8b1536544f8 100644
--- a/airflow-core/src/airflow/utils/file.py
+++ b/shared/module_loading/src/airflow_shared/module_loading/file_discovery.py
@@ -15,28 +15,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""File discovery utilities for finding files while respecting ignore
patterns."""
+
from __future__ import annotations
-import ast
-import hashlib
import logging
import os
import re
-import zipfile
from collections.abc import Generator
-from io import TextIOWrapper
from pathlib import Path
from re import Pattern
-from typing import NamedTuple, Protocol, overload
+from typing import NamedTuple, Protocol
from pathspec.patterns import GitWildMatchPattern
-from airflow.configuration import conf
-
log = logging.getLogger(__name__)
-MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
-
class _IgnoreRule(Protocol):
"""Interface for ignore rules for structural subtyping."""
@@ -125,45 +119,6 @@ class _GlobIgnoreRule(NamedTuple):
return matched
-ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
-
-
-@overload
-def correct_maybe_zipped(fileloc: None) -> None: ...
-
-
-@overload
-def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ...
-
-
-def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
- """If the path contains a folder with a .zip suffix, treat it as a zip
archive and return path."""
- if not fileloc:
- return fileloc
- search_ = ZIP_REGEX.search(str(fileloc))
- if not search_:
- return fileloc
- _, archive, _ = search_.groups()
- if archive and zipfile.is_zipfile(archive):
- return archive
- return fileloc
-
-
-def open_maybe_zipped(fileloc, mode="r"):
- """
- Open the given file.
-
- If the path contains a folder with a .zip suffix, then the folder
- is treated as a zip archive, opening the file inside the archive.
-
- :return: a file object, as in `open`, or as in `ZipFile.open`.
- """
- _, archive, filename = ZIP_REGEX.search(fileloc).groups()
- if archive and zipfile.is_zipfile(archive):
- return TextIOWrapper(zipfile.ZipFile(archive,
mode=mode).open(filename))
- return open(fileloc, mode=mode)
-
-
def _find_path_from_directory(
base_dir_path: str | os.PathLike[str],
ignore_file_name: str,
@@ -224,14 +179,14 @@ def _find_path_from_directory(
def find_path_from_directory(
base_dir_path: str | os.PathLike[str],
ignore_file_name: str,
- ignore_file_syntax: str = conf.get_mandatory_value("core",
"DAG_IGNORE_FILE_SYNTAX", fallback="glob"),
+ ignore_file_syntax: str = "glob",
) -> Generator[str, None, None]:
"""
Recursively search the base path for a list of file paths that should not
be ignored.
:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name in which specifies the patterns of
files/dirs to be ignored
- :param ignore_file_syntax: the syntax of patterns in the ignore file:
regexp or glob
+ :param ignore_file_syntax: the syntax of patterns in the ignore file:
regexp or glob (default: glob)
:return: a generator of file paths.
"""
@@ -240,116 +195,3 @@ def find_path_from_directory(
if ignore_file_syntax == "regexp":
return _find_path_from_directory(base_dir_path, ignore_file_name,
_RegexpIgnoreRule)
raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
-
-
-def list_py_file_paths(
- directory: str | os.PathLike[str] | None,
- safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE",
fallback=True),
-) -> list[str]:
- """
- Traverse a directory and look for Python files.
-
- :param directory: the directory to traverse
- :param safe_mode: whether to use a heuristic to determine whether a file
- contains Airflow DAG definitions. If not provided, use the
- core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
- to safe.
- :return: a list of paths to Python files in the specified directory
- """
- file_paths: list[str] = []
- if directory is None:
- file_paths = []
- elif os.path.isfile(directory):
- file_paths = [str(directory)]
- elif os.path.isdir(directory):
- file_paths.extend(find_dag_file_paths(directory, safe_mode))
- return file_paths
-
-
-def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) ->
list[str]:
- """Find file paths of all DAG files."""
- file_paths = []
-
- for file_path in find_path_from_directory(directory, ".airflowignore"):
- path = Path(file_path)
- try:
- if path.is_file() and (path.suffix == ".py" or
zipfile.is_zipfile(path)):
- if might_contain_dag(file_path, safe_mode):
- file_paths.append(file_path)
- except Exception:
- log.exception("Error while examining %s", file_path)
-
- return file_paths
-
-
-COMMENT_PATTERN = re.compile(r"\s*#.*")
-
-
-def might_contain_dag(file_path: str, safe_mode: bool, zip_file:
zipfile.ZipFile | None = None) -> bool:
- """
- Check whether a Python file contains Airflow DAGs.
-
- When safe_mode is off (with False value), this function always returns
True.
-
- If might_contain_dag_callable isn't specified, it uses airflow default
heuristic
- """
- if not safe_mode:
- return True
-
- might_contain_dag_callable = conf.getimport(
- "core",
- "might_contain_dag_callable",
- fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
- )
- return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
-
-
-def might_contain_dag_via_default_heuristic(file_path: str, zip_file:
zipfile.ZipFile | None = None) -> bool:
- """
- Heuristic that guesses whether a Python file contains an Airflow DAG
definition.
-
- :param file_path: Path to the file to be checked.
- :param zip_file: if passed, checks the archive. Otherwise, check local
filesystem.
- :return: True, if file might contain DAGs.
- """
- if zip_file:
- with zip_file.open(file_path) as current_file:
- content = current_file.read()
- else:
- if zipfile.is_zipfile(file_path):
- return True
- with open(file_path, "rb") as dag_file:
- content = dag_file.read()
- content = content.lower()
- if b"airflow" not in content:
- return False
- return any(s in content for s in (b"dag", b"asset"))
-
-
-def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
- for st in module.body:
- if isinstance(st, ast.Import):
- for n in st.names:
- yield n.name
- elif isinstance(st, ast.ImportFrom) and st.module is not None:
- yield st.module
-
-
-def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
- """Find Airflow modules imported in the given file."""
- try:
- parsed = ast.parse(Path(file_path).read_bytes())
- except Exception:
- return
- for m in _find_imported_modules(parsed):
- if m.startswith("airflow."):
- yield m
-
-
-def get_unique_dag_module_name(file_path: str) -> str:
- """Return a unique module name in the format unusual_prefix_{sha1 of
module's file path}_{original module name}."""
- if isinstance(file_path, str):
- path_hash = hashlib.sha1(file_path.encode("utf-8"),
usedforsecurity=False).hexdigest()
- org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem)
- return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash,
module_name=org_mod_name)
- raise ValueError("file_path should be a string to generate unique module
name")
diff --git a/shared/module_loading/tests/module_loading/test_file_discovery.py
b/shared/module_loading/tests/module_loading/test_file_discovery.py
new file mode 100644
index 00000000000..0c5347dca1d
--- /dev/null
+++ b/shared/module_loading/tests/module_loading/test_file_discovery.py
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from pathlib import Path
+
+import pytest
+
+from airflow_shared.module_loading import find_path_from_directory
+
+
+class TestFindPathFromDirectory:
+ @pytest.fixture
+ def test_dir(self, tmp_path):
+ # create test tree with symlinks
+ source = os.path.join(tmp_path, "folder")
+ target = os.path.join(tmp_path, "symlink")
+ py_file = os.path.join(source, "hello_world.py")
+ ignore_file = os.path.join(tmp_path, ".airflowignore")
+ os.mkdir(source)
+ os.symlink(source, target)
+ # write ignore files
+ with open(ignore_file, "w") as f:
+ f.write("folder")
+ # write sample pyfile
+ with open(py_file, "w") as f:
+ f.write("print('hello world')")
+ return tmp_path
+
+ def test_find_path_from_directory_respects_symlinks_regexp_ignore(self,
test_dir):
+ ignore_list_file = ".airflowignore"
+ found = list(find_path_from_directory(test_dir, ignore_list_file,
"regexp"))
+
+ assert os.path.join(test_dir, "symlink", "hello_world.py") in found
+ assert os.path.join(test_dir, "folder", "hello_world.py") not in found
+
+ def test_find_path_from_directory_respects_symlinks_glob_ignore(self,
test_dir):
+ ignore_list_file = ".airflowignore"
+ found = list(find_path_from_directory(test_dir, ignore_list_file,
ignore_file_syntax="glob"))
+
+ assert os.path.join(test_dir, "symlink", "hello_world.py") in found
+ assert os.path.join(test_dir, "folder", "hello_world.py") not in found
+
+ def test_find_path_from_directory_fails_on_recursive_link(self, test_dir):
+ # add a recursive link
+ recursing_src = os.path.join(test_dir, "folder2", "recursor")
+ recursing_tgt = os.path.join(test_dir, "folder2")
+ os.mkdir(recursing_tgt)
+ os.symlink(recursing_tgt, recursing_src)
+
+ ignore_list_file = ".airflowignore"
+
+ error_message = (
+ f"Detected recursive loop when walking DAG directory {test_dir}: "
+ f"{Path(recursing_tgt).resolve()} has appeared more than once."
+ )
+ with pytest.raises(RuntimeError, match=error_message):
+ list(find_path_from_directory(test_dir, ignore_list_file,
ignore_file_syntax="glob"))
+
+ def test_airflowignore_negation_unignore_subfolder_file_glob(self,
tmp_path):
+ """Ensure negation rules can unignore a subfolder and a file inside it
when using glob syntax.
+
+ Patterns:
+ * -> ignore everything
+ !subfolder/ -> unignore the subfolder (must match
directory rule)
+ !subfolder/keep.py -> unignore a specific file inside the
subfolder
+ """
+ dags_root = tmp_path / "dags"
+ (dags_root / "subfolder").mkdir(parents=True)
+ # files
+ (dags_root / "drop.py").write_text("raise Exception('ignored')\n")
+ (dags_root / "subfolder" / "keep.py").write_text("# should be
discovered\n")
+ (dags_root / "subfolder" / "drop.py").write_text("raise
Exception('ignored')\n")
+
+ (dags_root / ".airflowignore").write_text(
+ "\n".join(
+ [
+ "*",
+ "!subfolder/",
+ "!subfolder/keep.py",
+ ]
+ )
+ )
+
+ detected = set()
+ for raw in find_path_from_directory(dags_root, ".airflowignore",
"glob"):
+ p = Path(raw)
+ if p.is_file() and p.suffix == ".py":
+ detected.add(p.relative_to(dags_root).as_posix())
+
+ assert detected == {"subfolder/keep.py"}
+
+ def test_airflowignore_negation_nested_with_globstar(self, tmp_path):
+ """Negation with ** should work for nested subfolders."""
+ dags_root = tmp_path / "dags"
+ nested = dags_root / "a" / "b" / "subfolder"
+ nested.mkdir(parents=True)
+
+ # files
+ (dags_root / "ignore_top.py").write_text("raise
Exception('ignored')\n")
+ (nested / "keep.py").write_text("# should be discovered\n")
+ (nested / "drop.py").write_text("raise Exception('ignored')\n")
+
+ (dags_root / ".airflowignore").write_text(
+ "\n".join(
+ [
+ "*",
+ "!a/",
+ "!a/b/",
+ "!**/subfolder/",
+ "!**/subfolder/keep.py",
+ "drop.py",
+ ]
+ )
+ )
+
+ detected = set()
+ for raw in find_path_from_directory(dags_root, ".airflowignore",
"glob"):
+ p = Path(raw)
+ if p.is_file() and p.suffix == ".py":
+ detected.add(p.relative_to(dags_root).as_posix())
+
+ assert detected == {"a/b/subfolder/keep.py"}
diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml
index 91991509710..3479c0c04c4 100644
--- a/task-sdk/pyproject.toml
+++ b/task-sdk/pyproject.toml
@@ -79,6 +79,7 @@ dependencies = [
# End of shared configuration dependencies
# Start of shared module-loading dependencies
'importlib_metadata>=6.5;python_version<"3.12"',
+ "pathspec>=0.9.0",
# End of shared module-loading dependencies
]