This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8badad15411 Move find_path_from_directory to shared module_loading 
library (#60114)
8badad15411 is described below

commit 8badad1541132d50a8b716c3f9715f0b620da72f
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jan 5 14:16:48 2026 +0530

    Move find_path_from_directory to shared module_loading library (#60114)
---
 airflow-core/src/airflow/plugins_manager.py        |  13 +-
 airflow-core/src/airflow/utils/file.py             | 193 +++------------------
 .../tests/unit/plugins/test_plugin_ignore.py       |   2 +-
 airflow-core/tests/unit/utils/test_file.py         | 115 +-----------
 shared/module_loading/pyproject.toml               |   1 +
 .../src/airflow_shared/module_loading/__init__.py  |   4 +
 .../module_loading/file_discovery.py               | 168 +-----------------
 .../tests/module_loading/test_file_discovery.py    | 139 +++++++++++++++
 task-sdk/pyproject.toml                            |   1 +
 9 files changed, 184 insertions(+), 452 deletions(-)

diff --git a/airflow-core/src/airflow/plugins_manager.py 
b/airflow-core/src/airflow/plugins_manager.py
index 40cfe9bd25d..fd4b538f36f 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -32,13 +32,17 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
 from airflow import settings
-from airflow._shared.module_loading import entry_points_with_dist, 
import_string, qualname
+from airflow._shared.module_loading import (
+    entry_points_with_dist,
+    find_path_from_directory,
+    import_string,
+    qualname,
+)
 from airflow.configuration import conf
 from airflow.task.priority_strategy import (
     PriorityWeightStrategy,
     airflow_priority_weight_strategies,
 )
-from airflow.utils.file import find_path_from_directory
 
 if TYPE_CHECKING:
     from airflow.lineage.hook import HookLineageReader
@@ -205,7 +209,8 @@ def _load_plugins_from_plugin_directory() -> 
tuple[list[AirflowPlugin], dict[str
     if settings.PLUGINS_FOLDER is None:
         raise ValueError("Plugins folder is not set")
     log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
-    files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
+    ignore_file_syntax = conf.get_mandatory_value("core", 
"DAG_IGNORE_FILE_SYNTAX", fallback="glob")
+    files = find_path_from_directory(settings.PLUGINS_FOLDER, 
".airflowignore", ignore_file_syntax)
     plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = 
[("", files)]
 
     if conf.getboolean("core", "LOAD_EXAMPLES"):
@@ -213,7 +218,7 @@ def _load_plugins_from_plugin_directory() -> 
tuple[list[AirflowPlugin], dict[str
         from airflow.example_dags import plugins as example_plugins
 
         example_plugins_folder = next(iter(example_plugins.__path__))
-        example_files = find_path_from_directory(example_plugins_folder, 
".airflowignore")
+        example_files = find_path_from_directory(example_plugins_folder, 
".airflowignore", ignore_file_syntax)
         plugin_search_locations.append((example_plugins.__name__, 
example_files))
 
     plugins: list[AirflowPlugin] = []
diff --git a/airflow-core/src/airflow/utils/file.py 
b/airflow-core/src/airflow/utils/file.py
index 76cdf846fe0..c614cfff0ad 100644
--- a/airflow-core/src/airflow/utils/file.py
+++ b/airflow-core/src/airflow/utils/file.py
@@ -26,10 +26,7 @@ import zipfile
 from collections.abc import Generator
 from io import TextIOWrapper
 from pathlib import Path
-from re import Pattern
-from typing import NamedTuple, Protocol, overload
-
-from pathspec.patterns import GitWildMatchPattern
+from typing import overload
 
 from airflow.configuration import conf
 
@@ -38,93 +35,6 @@ log = logging.getLogger(__name__)
 MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
 
 
-class _IgnoreRule(Protocol):
-    """Interface for ignore rules for structural subtyping."""
-
-    @staticmethod
-    def compile(pattern: str, base_dir: Path, definition_file: Path) -> 
_IgnoreRule | None:
-        """
-        Build an ignore rule from the supplied pattern.
-
-        ``base_dir`` and ``definition_file`` should be absolute paths.
-        """
-
-    @staticmethod
-    def match(path: Path, rules: list[_IgnoreRule]) -> bool:
-        """Match a candidate absolute path against a list of rules."""
-
-
-class _RegexpIgnoreRule(NamedTuple):
-    """Typed namedtuple with utility functions for regexp ignore rules."""
-
-    pattern: Pattern
-    base_dir: Path
-
-    @staticmethod
-    def compile(pattern: str, base_dir: Path, definition_file: Path) -> 
_IgnoreRule | None:
-        """Build an ignore rule from the supplied regexp pattern and log a 
useful warning if it is invalid."""
-        try:
-            return _RegexpIgnoreRule(re.compile(pattern), base_dir)
-        except re.error as e:
-            log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, 
definition_file, e)
-            return None
-
-    @staticmethod
-    def match(path: Path, rules: list[_IgnoreRule]) -> bool:
-        """Match a list of ignore rules against the supplied path."""
-        for rule in rules:
-            if not isinstance(rule, _RegexpIgnoreRule):
-                raise ValueError(f"_RegexpIgnoreRule cannot match rules of 
type: {type(rule)}")
-            if rule.pattern.search(str(path.relative_to(rule.base_dir))) is 
not None:
-                return True
-        return False
-
-
-class _GlobIgnoreRule(NamedTuple):
-    """Typed namedtuple with utility functions for glob ignore rules."""
-
-    wild_match_pattern: GitWildMatchPattern
-    relative_to: Path | None = None
-
-    @staticmethod
-    def compile(pattern: str, base_dir: Path, definition_file: Path) -> 
_IgnoreRule | None:
-        """Build an ignore rule from the supplied glob pattern and log a 
useful warning if it is invalid."""
-        relative_to: Path | None = None
-        if pattern.strip() == "/":
-            # "/" doesn't match anything in gitignore
-            log.warning("Ignoring no-op glob pattern '/' from %s", 
definition_file)
-            return None
-        if pattern.startswith("/") or "/" in pattern.rstrip("/"):
-            # See https://git-scm.com/docs/gitignore
-            # > If there is a separator at the beginning or middle (or both) 
of the pattern, then the
-            # > pattern is relative to the directory level of the particular 
.gitignore file itself.
-            # > Otherwise the pattern may also match at any level below the 
.gitignore level.
-            relative_to = definition_file.parent
-
-        ignore_pattern = GitWildMatchPattern(pattern)
-        return _GlobIgnoreRule(wild_match_pattern=ignore_pattern, 
relative_to=relative_to)
-
-    @staticmethod
-    def match(path: Path, rules: list[_IgnoreRule]) -> bool:
-        """Match a list of ignore rules against the supplied path, accounting 
for exclusion rules and ordering."""
-        matched = False
-        for rule in rules:
-            if not isinstance(rule, _GlobIgnoreRule):
-                raise ValueError(f"_GlobIgnoreRule cannot match rules of type: 
{type(rule)}")
-            rel_obj = path.relative_to(rule.relative_to) if rule.relative_to 
else Path(path.name)
-            if path.is_dir():
-                rel_path = f"{rel_obj.as_posix()}/"
-            else:
-                rel_path = rel_obj.as_posix()
-            if (
-                rule.wild_match_pattern.include is not None
-                and rule.wild_match_pattern.match_file(rel_path) is not None
-            ):
-                matched = rule.wild_match_pattern.include
-
-        return matched
-
-
 ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
 
 
@@ -164,84 +74,6 @@ def open_maybe_zipped(fileloc, mode="r"):
     return open(fileloc, mode=mode)
 
 
-def _find_path_from_directory(
-    base_dir_path: str | os.PathLike[str],
-    ignore_file_name: str,
-    ignore_rule_type: type[_IgnoreRule],
-) -> Generator[str, None, None]:
-    """
-    Recursively search the base path and return the list of file paths that 
should not be ignored.
-
-    :param base_dir_path: the base path to be searched
-    :param ignore_file_name: the file name containing regular expressions for 
files that should be ignored.
-    :param ignore_rule_type: the concrete class for ignore rules, which 
implements the _IgnoreRule interface.
-
-    :return: a generator of file paths which should not be ignored.
-    """
-    # A Dict of patterns, keyed using resolved, absolute paths
-    patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
-
-    for root, dirs, files in os.walk(base_dir_path, followlinks=True):
-        patterns: list[_IgnoreRule] = 
patterns_by_dir.get(Path(root).resolve(), [])
-
-        ignore_file_path = Path(root) / ignore_file_name
-        if ignore_file_path.is_file():
-            with open(ignore_file_path) as ifile:
-                patterns_to_match_excluding_comments = [
-                    re.sub(r"\s*#.*", "", line) for line in 
ifile.read().split("\n")
-                ]
-                # append new patterns and filter out "None" objects, which are 
invalid patterns
-                patterns += [
-                    p
-                    for p in [
-                        ignore_rule_type.compile(pattern, Path(base_dir_path), 
ignore_file_path)
-                        for pattern in patterns_to_match_excluding_comments
-                        if pattern
-                    ]
-                    if p is not None
-                ]
-                # evaluation order of patterns is important with negation
-                # so that later patterns can override earlier patterns
-
-        dirs[:] = [subdir for subdir in dirs if not 
ignore_rule_type.match(Path(root) / subdir, patterns)]
-        # explicit loop for infinite recursion detection since we are 
following symlinks in this walk
-        for sd in dirs:
-            dirpath = (Path(root) / sd).resolve()
-            if dirpath in patterns_by_dir:
-                raise RuntimeError(
-                    "Detected recursive loop when walking DAG directory "
-                    f"{base_dir_path}: {dirpath} has appeared more than once."
-                )
-            patterns_by_dir.update({dirpath: patterns.copy()})
-
-        for file in files:
-            if file != ignore_file_name:
-                abs_file_path = Path(root) / file
-                if not ignore_rule_type.match(abs_file_path, patterns):
-                    yield str(abs_file_path)
-
-
-def find_path_from_directory(
-    base_dir_path: str | os.PathLike[str],
-    ignore_file_name: str,
-    ignore_file_syntax: str = conf.get_mandatory_value("core", 
"DAG_IGNORE_FILE_SYNTAX", fallback="glob"),
-) -> Generator[str, None, None]:
-    """
-    Recursively search the base path for a list of file paths that should not 
be ignored.
-
-    :param base_dir_path: the base path to be searched
-    :param ignore_file_name: the file name in which specifies the patterns of 
files/dirs to be ignored
-    :param ignore_file_syntax: the syntax of patterns in the ignore file: 
regexp or glob
-
-    :return: a generator of file paths.
-    """
-    if ignore_file_syntax == "glob" or not ignore_file_syntax:
-        return _find_path_from_directory(base_dir_path, ignore_file_name, 
_GlobIgnoreRule)
-    if ignore_file_syntax == "regexp":
-        return _find_path_from_directory(base_dir_path, ignore_file_name, 
_RegexpIgnoreRule)
-    raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
-
-
 def list_py_file_paths(
     directory: str | os.PathLike[str] | None,
     safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", 
fallback=True),
@@ -268,9 +100,12 @@ def list_py_file_paths(
 
 def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> 
list[str]:
     """Find file paths of all DAG files."""
+    from airflow._shared.module_loading.file_discovery import 
find_path_from_directory
+
     file_paths = []
+    ignore_file_syntax = conf.get_mandatory_value("core", 
"DAG_IGNORE_FILE_SYNTAX", fallback="glob")
 
-    for file_path in find_path_from_directory(directory, ".airflowignore"):
+    for file_path in find_path_from_directory(directory, ".airflowignore", 
ignore_file_syntax):
         path = Path(file_path)
         try:
             if path.is_file() and (path.suffix == ".py" or 
zipfile.is_zipfile(path)):
@@ -353,3 +188,21 @@ def get_unique_dag_module_name(file_path: str) -> str:
         org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem)
         return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, 
module_name=org_mod_name)
     raise ValueError("file_path should be a string to generate unique module 
name")
+
+
+def __getattr__(name: str):
+    if name == "find_path_from_directory":
+        import warnings
+
+        from airflow._shared.module_loading import find_path_from_directory
+        from airflow.utils.deprecation_tools import DeprecatedImportWarning
+
+        warnings.warn(
+            "Importing find_path_from_directory from airflow.utils.file is 
deprecated "
+            "and will be removed in a future version. "
+            "Use airflow._shared.module_loading.find_path_from_directory 
instead.",
+            DeprecatedImportWarning,
+            stacklevel=2,
+        )
+        return find_path_from_directory
+    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/airflow-core/tests/unit/plugins/test_plugin_ignore.py 
b/airflow-core/tests/unit/plugins/test_plugin_ignore.py
index 671cb71dacc..68e56abd7d3 100644
--- a/airflow-core/tests/unit/plugins/test_plugin_ignore.py
+++ b/airflow-core/tests/unit/plugins/test_plugin_ignore.py
@@ -21,7 +21,7 @@ from pathlib import Path
 from unittest.mock import patch
 
 from airflow import settings
-from airflow.utils.file import find_path_from_directory
+from airflow._shared.module_loading import find_path_from_directory
 
 
 def populate_dir(root_path):
diff --git a/airflow-core/tests/unit/utils/test_file.py 
b/airflow-core/tests/unit/utils/test_file.py
index fc46a3a6560..cc55c1ac063 100644
--- a/airflow-core/tests/unit/utils/test_file.py
+++ b/airflow-core/tests/unit/utils/test_file.py
@@ -19,16 +19,15 @@ from __future__ import annotations
 
 import os
 import zipfile
-from pathlib import Path
 from pprint import pformat
 from unittest import mock
 
 import pytest
 
+from airflow._shared.module_loading import find_path_from_directory
 from airflow.utils import file as file_utils
 from airflow.utils.file import (
     correct_maybe_zipped,
-    find_path_from_directory,
     list_py_file_paths,
     open_maybe_zipped,
 )
@@ -97,23 +96,6 @@ class TestOpenMaybeZipped:
 
 
 class TestListPyFilesPath:
-    @pytest.fixture
-    def test_dir(self, tmp_path):
-        # create test tree with symlinks
-        source = os.path.join(tmp_path, "folder")
-        target = os.path.join(tmp_path, "symlink")
-        py_file = os.path.join(source, "hello_world.py")
-        ignore_file = os.path.join(tmp_path, ".airflowignore")
-        os.mkdir(source)
-        os.symlink(source, target)
-        # write ignore files
-        with open(ignore_file, "w") as f:
-            f.write("folder")
-        # write sample pyfile
-        with open(py_file, "w") as f:
-            f.write("print('hello world')")
-        return tmp_path
-
     def test_find_path_from_directory_regex_ignore(self):
         should_ignore = [
             "test_invalid_cron.py",
@@ -154,106 +136,11 @@ class TestListPyFilesPath:
             f"actual_included_filenames: 
{pformat(actual_included_filenames)}\nexpected_included_filenames: 
{pformat(should_not_ignore)}"
         )
 
-    def test_find_path_from_directory_respects_symlinks_regexp_ignore(self, 
test_dir):
-        ignore_list_file = ".airflowignore"
-        found = list(find_path_from_directory(test_dir, ignore_list_file))
-
-        assert os.path.join(test_dir, "symlink", "hello_world.py") in found
-        assert os.path.join(test_dir, "folder", "hello_world.py") not in found
-
-    def test_find_path_from_directory_respects_symlinks_glob_ignore(self, 
test_dir):
-        ignore_list_file = ".airflowignore"
-        found = list(find_path_from_directory(test_dir, ignore_list_file, 
ignore_file_syntax="glob"))
-
-        assert os.path.join(test_dir, "symlink", "hello_world.py") in found
-        assert os.path.join(test_dir, "folder", "hello_world.py") not in found
-
-    def test_find_path_from_directory_fails_on_recursive_link(self, test_dir):
-        # add a recursive link
-        recursing_src = os.path.join(test_dir, "folder2", "recursor")
-        recursing_tgt = os.path.join(test_dir, "folder2")
-        os.mkdir(recursing_tgt)
-        os.symlink(recursing_tgt, recursing_src)
-
-        ignore_list_file = ".airflowignore"
-
-        error_message = (
-            f"Detected recursive loop when walking DAG directory {test_dir}: "
-            f"{Path(recursing_tgt).resolve()} has appeared more than once."
-        )
-        with pytest.raises(RuntimeError, match=error_message):
-            list(find_path_from_directory(test_dir, ignore_list_file, 
ignore_file_syntax="glob"))
-
     def test_might_contain_dag_with_default_callable(self):
         file_path_with_dag = os.path.join(TEST_DAGS_FOLDER, 
"test_scheduler_dags.py")
 
         assert file_utils.might_contain_dag(file_path=file_path_with_dag, 
safe_mode=True)
 
-    def test_airflowignore_negation_unignore_subfolder_file_glob(self, 
tmp_path):
-        """Ensure negation rules can unignore a subfolder and a file inside it 
when using glob syntax.
-
-        Patterns:
-          *                     -> ignore everything
-          !subfolder/           -> unignore the subfolder (must match 
directory rule)
-          !subfolder/keep.py    -> unignore a specific file inside the 
subfolder
-        """
-        dags_root = tmp_path / "dags"
-        (dags_root / "subfolder").mkdir(parents=True)
-        # files
-        (dags_root / "drop.py").write_text("raise Exception('ignored')\n")
-        (dags_root / "subfolder" / "keep.py").write_text("# should be 
discovered\n")
-        (dags_root / "subfolder" / "drop.py").write_text("raise 
Exception('ignored')\n")
-
-        (dags_root / ".airflowignore").write_text(
-            "\n".join(
-                [
-                    "*",
-                    "!subfolder/",
-                    "!subfolder/keep.py",
-                ]
-            )
-        )
-
-        detected = set()
-        for raw in find_path_from_directory(dags_root, ".airflowignore", 
"glob"):
-            p = Path(raw)
-            if p.is_file() and p.suffix == ".py":
-                detected.add(p.relative_to(dags_root).as_posix())
-
-        assert detected == {"subfolder/keep.py"}
-
-    def test_airflowignore_negation_nested_with_globstar(self, tmp_path):
-        """Negation with ** should work for nested subfolders."""
-        dags_root = tmp_path / "dags"
-        nested = dags_root / "a" / "b" / "subfolder"
-        nested.mkdir(parents=True)
-
-        # files
-        (dags_root / "ignore_top.py").write_text("raise 
Exception('ignored')\n")
-        (nested / "keep.py").write_text("# should be discovered\n")
-        (nested / "drop.py").write_text("raise Exception('ignored')\n")
-
-        (dags_root / ".airflowignore").write_text(
-            "\n".join(
-                [
-                    "*",
-                    "!a/",
-                    "!a/b/",
-                    "!**/subfolder/",
-                    "!**/subfolder/keep.py",
-                    "drop.py",
-                ]
-            )
-        )
-
-        detected = set()
-        for raw in find_path_from_directory(dags_root, ".airflowignore", 
"glob"):
-            p = Path(raw)
-            if p.is_file() and p.suffix == ".py":
-                detected.add(p.relative_to(dags_root).as_posix())
-
-        assert detected == {"a/b/subfolder/keep.py"}
-
     @conf_vars({("core", "might_contain_dag_callable"): 
"unit.utils.test_file.might_contain_dag"})
     def test_might_contain_dag(self):
         """Test might_contain_dag_callable"""
diff --git a/shared/module_loading/pyproject.toml 
b/shared/module_loading/pyproject.toml
index 84a25aa0f57..52fc5e636c0 100644
--- a/shared/module_loading/pyproject.toml
+++ b/shared/module_loading/pyproject.toml
@@ -25,6 +25,7 @@ classifiers = [
 
 dependencies = [
     'importlib_metadata>=6.5;python_version<"3.12"',
+    "pathspec>=0.9.0",
 ]
 
 [dependency-groups]
diff --git 
a/shared/module_loading/src/airflow_shared/module_loading/__init__.py 
b/shared/module_loading/src/airflow_shared/module_loading/__init__.py
index 07022cbf487..3268dc3ddc2 100644
--- a/shared/module_loading/src/airflow_shared/module_loading/__init__.py
+++ b/shared/module_loading/src/airflow_shared/module_loading/__init__.py
@@ -26,6 +26,10 @@ from collections.abc import Callable, Iterator
 from importlib import import_module
 from typing import TYPE_CHECKING
 
+from .file_discovery import (
+    find_path_from_directory as find_path_from_directory,
+)
+
 if sys.version_info >= (3, 12):
     from importlib import metadata
 else:
diff --git a/airflow-core/src/airflow/utils/file.py 
b/shared/module_loading/src/airflow_shared/module_loading/file_discovery.py
similarity index 59%
copy from airflow-core/src/airflow/utils/file.py
copy to 
shared/module_loading/src/airflow_shared/module_loading/file_discovery.py
index 76cdf846fe0..8b1536544f8 100644
--- a/airflow-core/src/airflow/utils/file.py
+++ b/shared/module_loading/src/airflow_shared/module_loading/file_discovery.py
@@ -15,28 +15,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""File discovery utilities for finding files while respecting ignore 
patterns."""
+
 from __future__ import annotations
 
-import ast
-import hashlib
 import logging
 import os
 import re
-import zipfile
 from collections.abc import Generator
-from io import TextIOWrapper
 from pathlib import Path
 from re import Pattern
-from typing import NamedTuple, Protocol, overload
+from typing import NamedTuple, Protocol
 
 from pathspec.patterns import GitWildMatchPattern
 
-from airflow.configuration import conf
-
 log = logging.getLogger(__name__)
 
-MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
-
 
 class _IgnoreRule(Protocol):
     """Interface for ignore rules for structural subtyping."""
@@ -125,45 +119,6 @@ class _GlobIgnoreRule(NamedTuple):
         return matched
 
 
-ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
-
-
-@overload
-def correct_maybe_zipped(fileloc: None) -> None: ...
-
-
-@overload
-def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ...
-
-
-def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
-    """If the path contains a folder with a .zip suffix, treat it as a zip 
archive and return path."""
-    if not fileloc:
-        return fileloc
-    search_ = ZIP_REGEX.search(str(fileloc))
-    if not search_:
-        return fileloc
-    _, archive, _ = search_.groups()
-    if archive and zipfile.is_zipfile(archive):
-        return archive
-    return fileloc
-
-
-def open_maybe_zipped(fileloc, mode="r"):
-    """
-    Open the given file.
-
-    If the path contains a folder with a .zip suffix, then the folder
-    is treated as a zip archive, opening the file inside the archive.
-
-    :return: a file object, as in `open`, or as in `ZipFile.open`.
-    """
-    _, archive, filename = ZIP_REGEX.search(fileloc).groups()
-    if archive and zipfile.is_zipfile(archive):
-        return TextIOWrapper(zipfile.ZipFile(archive, 
mode=mode).open(filename))
-    return open(fileloc, mode=mode)
-
-
 def _find_path_from_directory(
     base_dir_path: str | os.PathLike[str],
     ignore_file_name: str,
@@ -224,14 +179,14 @@ def _find_path_from_directory(
 def find_path_from_directory(
     base_dir_path: str | os.PathLike[str],
     ignore_file_name: str,
-    ignore_file_syntax: str = conf.get_mandatory_value("core", 
"DAG_IGNORE_FILE_SYNTAX", fallback="glob"),
+    ignore_file_syntax: str = "glob",
 ) -> Generator[str, None, None]:
     """
     Recursively search the base path for a list of file paths that should not 
be ignored.
 
     :param base_dir_path: the base path to be searched
     :param ignore_file_name: the file name in which specifies the patterns of 
files/dirs to be ignored
-    :param ignore_file_syntax: the syntax of patterns in the ignore file: 
regexp or glob
+    :param ignore_file_syntax: the syntax of patterns in the ignore file: 
regexp or glob (default: glob)
 
     :return: a generator of file paths.
     """
@@ -240,116 +195,3 @@ def find_path_from_directory(
     if ignore_file_syntax == "regexp":
         return _find_path_from_directory(base_dir_path, ignore_file_name, 
_RegexpIgnoreRule)
     raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
-
-
-def list_py_file_paths(
-    directory: str | os.PathLike[str] | None,
-    safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", 
fallback=True),
-) -> list[str]:
-    """
-    Traverse a directory and look for Python files.
-
-    :param directory: the directory to traverse
-    :param safe_mode: whether to use a heuristic to determine whether a file
-        contains Airflow DAG definitions. If not provided, use the
-        core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
-        to safe.
-    :return: a list of paths to Python files in the specified directory
-    """
-    file_paths: list[str] = []
-    if directory is None:
-        file_paths = []
-    elif os.path.isfile(directory):
-        file_paths = [str(directory)]
-    elif os.path.isdir(directory):
-        file_paths.extend(find_dag_file_paths(directory, safe_mode))
-    return file_paths
-
-
-def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> 
list[str]:
-    """Find file paths of all DAG files."""
-    file_paths = []
-
-    for file_path in find_path_from_directory(directory, ".airflowignore"):
-        path = Path(file_path)
-        try:
-            if path.is_file() and (path.suffix == ".py" or 
zipfile.is_zipfile(path)):
-                if might_contain_dag(file_path, safe_mode):
-                    file_paths.append(file_path)
-        except Exception:
-            log.exception("Error while examining %s", file_path)
-
-    return file_paths
-
-
-COMMENT_PATTERN = re.compile(r"\s*#.*")
-
-
-def might_contain_dag(file_path: str, safe_mode: bool, zip_file: 
zipfile.ZipFile | None = None) -> bool:
-    """
-    Check whether a Python file contains Airflow DAGs.
-
-    When safe_mode is off (with False value), this function always returns 
True.
-
-    If might_contain_dag_callable isn't specified, it uses airflow default 
heuristic
-    """
-    if not safe_mode:
-        return True
-
-    might_contain_dag_callable = conf.getimport(
-        "core",
-        "might_contain_dag_callable",
-        fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
-    )
-    return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
-
-
-def might_contain_dag_via_default_heuristic(file_path: str, zip_file: 
zipfile.ZipFile | None = None) -> bool:
-    """
-    Heuristic that guesses whether a Python file contains an Airflow DAG 
definition.
-
-    :param file_path: Path to the file to be checked.
-    :param zip_file: if passed, checks the archive. Otherwise, check local 
filesystem.
-    :return: True, if file might contain DAGs.
-    """
-    if zip_file:
-        with zip_file.open(file_path) as current_file:
-            content = current_file.read()
-    else:
-        if zipfile.is_zipfile(file_path):
-            return True
-        with open(file_path, "rb") as dag_file:
-            content = dag_file.read()
-    content = content.lower()
-    if b"airflow" not in content:
-        return False
-    return any(s in content for s in (b"dag", b"asset"))
-
-
-def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
-    for st in module.body:
-        if isinstance(st, ast.Import):
-            for n in st.names:
-                yield n.name
-        elif isinstance(st, ast.ImportFrom) and st.module is not None:
-            yield st.module
-
-
-def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
-    """Find Airflow modules imported in the given file."""
-    try:
-        parsed = ast.parse(Path(file_path).read_bytes())
-    except Exception:
-        return
-    for m in _find_imported_modules(parsed):
-        if m.startswith("airflow."):
-            yield m
-
-
-def get_unique_dag_module_name(file_path: str) -> str:
-    """Return a unique module name in the format unusual_prefix_{sha1 of 
module's file path}_{original module name}."""
-    if isinstance(file_path, str):
-        path_hash = hashlib.sha1(file_path.encode("utf-8"), 
usedforsecurity=False).hexdigest()
-        org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem)
-        return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, 
module_name=org_mod_name)
-    raise ValueError("file_path should be a string to generate unique module 
name")
diff --git a/shared/module_loading/tests/module_loading/test_file_discovery.py 
b/shared/module_loading/tests/module_loading/test_file_discovery.py
new file mode 100644
index 00000000000..0c5347dca1d
--- /dev/null
+++ b/shared/module_loading/tests/module_loading/test_file_discovery.py
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from pathlib import Path
+
+import pytest
+
+from airflow_shared.module_loading import find_path_from_directory
+
+
+class TestFindPathFromDirectory:
+    @pytest.fixture
+    def test_dir(self, tmp_path):
+        # create test tree with symlinks
+        source = os.path.join(tmp_path, "folder")
+        target = os.path.join(tmp_path, "symlink")
+        py_file = os.path.join(source, "hello_world.py")
+        ignore_file = os.path.join(tmp_path, ".airflowignore")
+        os.mkdir(source)
+        os.symlink(source, target)
+        # write ignore files
+        with open(ignore_file, "w") as f:
+            f.write("folder")
+        # write sample pyfile
+        with open(py_file, "w") as f:
+            f.write("print('hello world')")
+        return tmp_path
+
+    def test_find_path_from_directory_respects_symlinks_regexp_ignore(self, 
test_dir):
+        ignore_list_file = ".airflowignore"
+        found = list(find_path_from_directory(test_dir, ignore_list_file, 
"regexp"))
+
+        assert os.path.join(test_dir, "symlink", "hello_world.py") in found
+        assert os.path.join(test_dir, "folder", "hello_world.py") not in found
+
+    def test_find_path_from_directory_respects_symlinks_glob_ignore(self, 
test_dir):
+        ignore_list_file = ".airflowignore"
+        found = list(find_path_from_directory(test_dir, ignore_list_file, 
ignore_file_syntax="glob"))
+
+        assert os.path.join(test_dir, "symlink", "hello_world.py") in found
+        assert os.path.join(test_dir, "folder", "hello_world.py") not in found
+
+    def test_find_path_from_directory_fails_on_recursive_link(self, test_dir):
+        # add a recursive link
+        recursing_src = os.path.join(test_dir, "folder2", "recursor")
+        recursing_tgt = os.path.join(test_dir, "folder2")
+        os.mkdir(recursing_tgt)
+        os.symlink(recursing_tgt, recursing_src)
+
+        ignore_list_file = ".airflowignore"
+
+        error_message = (
+            f"Detected recursive loop when walking DAG directory {test_dir}: "
+            f"{Path(recursing_tgt).resolve()} has appeared more than once."
+        )
+        with pytest.raises(RuntimeError, match=error_message):
+            list(find_path_from_directory(test_dir, ignore_list_file, 
ignore_file_syntax="glob"))
+
+    def test_airflowignore_negation_unignore_subfolder_file_glob(self, 
tmp_path):
+        """Ensure negation rules can unignore a subfolder and a file inside it 
when using glob syntax.
+
+        Patterns:
+          *                     -> ignore everything
+          !subfolder/           -> unignore the subfolder (must match 
directory rule)
+          !subfolder/keep.py    -> unignore a specific file inside the 
subfolder
+        """
+        dags_root = tmp_path / "dags"
+        (dags_root / "subfolder").mkdir(parents=True)
+        # files
+        (dags_root / "drop.py").write_text("raise Exception('ignored')\n")
+        (dags_root / "subfolder" / "keep.py").write_text("# should be 
discovered\n")
+        (dags_root / "subfolder" / "drop.py").write_text("raise 
Exception('ignored')\n")
+
+        (dags_root / ".airflowignore").write_text(
+            "\n".join(
+                [
+                    "*",
+                    "!subfolder/",
+                    "!subfolder/keep.py",
+                ]
+            )
+        )
+
+        detected = set()
+        for raw in find_path_from_directory(dags_root, ".airflowignore", 
"glob"):
+            p = Path(raw)
+            if p.is_file() and p.suffix == ".py":
+                detected.add(p.relative_to(dags_root).as_posix())
+
+        assert detected == {"subfolder/keep.py"}
+
+    def test_airflowignore_negation_nested_with_globstar(self, tmp_path):
+        """Negation with ** should work for nested subfolders."""
+        dags_root = tmp_path / "dags"
+        nested = dags_root / "a" / "b" / "subfolder"
+        nested.mkdir(parents=True)
+
+        # files
+        (dags_root / "ignore_top.py").write_text("raise 
Exception('ignored')\n")
+        (nested / "keep.py").write_text("# should be discovered\n")
+        (nested / "drop.py").write_text("raise Exception('ignored')\n")
+
+        (dags_root / ".airflowignore").write_text(
+            "\n".join(
+                [
+                    "*",
+                    "!a/",
+                    "!a/b/",
+                    "!**/subfolder/",
+                    "!**/subfolder/keep.py",
+                    "drop.py",
+                ]
+            )
+        )
+
+        detected = set()
+        for raw in find_path_from_directory(dags_root, ".airflowignore", 
"glob"):
+            p = Path(raw)
+            if p.is_file() and p.suffix == ".py":
+                detected.add(p.relative_to(dags_root).as_posix())
+
+        assert detected == {"a/b/subfolder/keep.py"}
diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml
index 91991509710..3479c0c04c4 100644
--- a/task-sdk/pyproject.toml
+++ b/task-sdk/pyproject.toml
@@ -79,6 +79,7 @@ dependencies = [
     # End of shared configuration dependencies
     # Start of shared module-loading dependencies
     'importlib_metadata>=6.5;python_version<"3.12"',
+    "pathspec>=0.9.0",
     # End of shared module-loading dependencies
 ]
 


Reply via email to