Copilot commented on code in PR #60999:
URL: https://github.com/apache/airflow/pull/60999#discussion_r3066485904


##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -816,15 +816,30 @@ def resolve_template_files(self):
             if hasattr(t, "resolve_template_files"):
                 t.resolve_template_files()
 
-    def get_template_env(self, *, force_sandboxed: bool = False) -> 
jinja2.Environment:
-        """Build a Jinja2 environment."""
-        from airflow.sdk.definitions._internal.templater import 
create_template_env
+    def _get_resolved_searchpath(self) -> list[str]:
+        """
+        Return searchpath with relative paths resolved for zipped DAGs.
+
+        For zipped DAGs, relative template_searchpath entries (e.g., 
``["templates"]``)
+        are resolved against the DAG folder (the zip file path).
+        """
+        from airflow.sdk.definitions._internal.templater import 
_has_zip_extension
 

Review Comment:
   New local import inside `_get_resolved_searchpath()` (`from 
airflow.sdk.definitions._internal.templater import _has_zip_extension`) is 
executed every call. Unless this is specifically to avoid a circular import or 
heavy dependency, prefer moving it to module scope (or colocating the helper in 
`dag.py`) to match standard import patterns and reduce per-call overhead.



##########
task-sdk/src/airflow/sdk/definitions/_internal/templater.py:
##########
@@ -58,6 +71,201 @@ def resolve(self, context: Context) -> Any:
 log = logging.getLogger(__name__)
 
 
+# This loader addresses the issue where template files in zipped DAG packages
+# could not be resolved by the standard FileSystemLoader.
+# See: https://github.com/apache/airflow/issues/59310
+class ZipAwareFileSystemLoader(jinja2.FileSystemLoader):
+    """
+    A Jinja2 template loader that supports resolving templates from zipped DAG 
packages.
+
+    Search paths may include filesystem directories, zip files, or 
subdirectories
+    within zip files. Searchpath ordering is preserved across zip and non-zip 
entries.
+    """
+
+    def __init__(
+        self,
+        searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]],
+        encoding: str = "utf-8",
+        followlinks: bool = False,
+    ) -> None:
+        # Convert to list first to process
+        if isinstance(searchpath, (str, os.PathLike)):
+            searchpath = [searchpath]
+        all_paths = [os.fspath(p) for p in searchpath]
+
+        # Separate zip paths from regular paths at initialization time (once)
+        # Store zip info by index to preserve searchpath order
+        self._zip_path_map: dict[int, tuple[str, str]] = {}  # {index: 
(archive_path, internal_base_path)}
+        # Per-path FileSystemLoader for non-zip paths, delegating to Jinja2's 
own logic
+        self._fs_loaders: dict[int, jinja2.FileSystemLoader] = {}
+        regular_paths: list[str] = []
+
+        for idx, path in enumerate(all_paths):
+            zip_info = self._parse_zip_path(path)
+            if zip_info:
+                self._zip_path_map[idx] = zip_info
+            else:
+                regular_paths.append(path)
+                self._fs_loaders[idx] = jinja2.FileSystemLoader([path], 
encoding, followlinks)
+
+        # Initialize parent with regular paths only (empty list is OK for our 
use case)
+        # We override get_source anyway, so parent's searchpath is only used 
for list_templates
+        super().__init__(regular_paths if regular_paths else [], encoding, 
followlinks)
+
+        # Store all paths for reference and error messages
+        self._all_searchpaths = all_paths
+        self.searchpath = all_paths
+
+    @staticmethod
+    def _parse_zip_path(path: str) -> tuple[str, str] | None:
+        """
+        Parse a path to extract zip archive and internal path components.
+
+        :param path: The path to parse
+        :return: Tuple of (archive_path, internal_base_path) if path is a zip 
path,
+                 None otherwise
+        """
+        # Check if the path itself is a zip file (no internal path)
+        if _has_zip_extension(path) and os.path.isfile(path) and 
zipfile.is_zipfile(path):
+            return (path, "")
+
+        # Check for paths inside a zip (e.g., "archive.zip/subdir" or 
"archive.zip\subdir")
+        match = ZIP_REGEX.match(path)
+        if match:
+            archive, internal = match.groups()
+            if archive and os.path.isfile(archive) and 
zipfile.is_zipfile(archive):
+                return (archive, internal or "")
+

Review Comment:
   `_parse_zip_path()` returns `internal_base_path` directly from the 
user-provided searchpath and it is later joined into `internal_path` without 
any traversal/absolute-path validation. This means a searchpath like 
`archive.zip/../templates` would be accepted and can produce `internal_path` 
values containing `..`, which is inconsistent with the stated goal of applying 
FileSystemLoader-equivalent path validation. Consider normalizing 
`internal_base_path` to POSIX form and rejecting absolute paths or any `..` 
segments before storing it in `_zip_path_map` (raising `TemplateNotFound` when 
encountered).



##########
task-sdk/src/airflow/sdk/definitions/_internal/templater.py:
##########
@@ -58,6 +71,201 @@ def resolve(self, context: Context) -> Any:
 log = logging.getLogger(__name__)
 
 
+# This loader addresses the issue where template files in zipped DAG packages
+# could not be resolved by the standard FileSystemLoader.
+# See: https://github.com/apache/airflow/issues/59310
+class ZipAwareFileSystemLoader(jinja2.FileSystemLoader):
+    """
+    A Jinja2 template loader that supports resolving templates from zipped DAG 
packages.
+
+    Search paths may include filesystem directories, zip files, or 
subdirectories
+    within zip files. Searchpath ordering is preserved across zip and non-zip 
entries.
+    """
+
+    def __init__(
+        self,
+        searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]],
+        encoding: str = "utf-8",
+        followlinks: bool = False,
+    ) -> None:
+        # Convert to list first to process
+        if isinstance(searchpath, (str, os.PathLike)):
+            searchpath = [searchpath]
+        all_paths = [os.fspath(p) for p in searchpath]
+
+        # Separate zip paths from regular paths at initialization time (once)
+        # Store zip info by index to preserve searchpath order
+        self._zip_path_map: dict[int, tuple[str, str]] = {}  # {index: 
(archive_path, internal_base_path)}
+        # Per-path FileSystemLoader for non-zip paths, delegating to Jinja2's 
own logic
+        self._fs_loaders: dict[int, jinja2.FileSystemLoader] = {}
+        regular_paths: list[str] = []
+
+        for idx, path in enumerate(all_paths):
+            zip_info = self._parse_zip_path(path)
+            if zip_info:
+                self._zip_path_map[idx] = zip_info
+            else:
+                regular_paths.append(path)
+                self._fs_loaders[idx] = jinja2.FileSystemLoader([path], 
encoding, followlinks)
+
+        # Initialize parent with regular paths only (empty list is OK for our 
use case)
+        # We override get_source anyway, so parent's searchpath is only used 
for list_templates
+        super().__init__(regular_paths if regular_paths else [], encoding, 
followlinks)
+
+        # Store all paths for reference and error messages
+        self._all_searchpaths = all_paths
+        self.searchpath = all_paths
+
+    @staticmethod
+    def _parse_zip_path(path: str) -> tuple[str, str] | None:
+        """
+        Parse a path to extract zip archive and internal path components.
+
+        :param path: The path to parse
+        :return: Tuple of (archive_path, internal_base_path) if path is a zip 
path,
+                 None otherwise
+        """
+        # Check if the path itself is a zip file (no internal path)
+        if _has_zip_extension(path) and os.path.isfile(path) and 
zipfile.is_zipfile(path):
+            return (path, "")
+
+        # Check for paths inside a zip (e.g., "archive.zip/subdir" or 
"archive.zip\subdir")
+        match = ZIP_REGEX.match(path)
+        if match:
+            archive, internal = match.groups()
+            if archive and os.path.isfile(archive) and 
zipfile.is_zipfile(archive):
+                return (archive, internal or "")
+
+        return None
+
+    def _read_from_zip(self, archive_path: str, internal_path: str) -> str:
+        """
+        Read a file from inside a zip archive.
+
+        :param archive_path: Path to the zip file
+        :param internal_path: Path to the file inside the zip
+        :return: The file contents as a string
+        :raises TemplateNotFound: If the file doesn't exist in the zip
+        """
+        try:
+            with zipfile.ZipFile(archive_path, "r") as zf:
+                # Normalize path separators for zip (always forward slashes)
+                normalized_path = internal_path.replace(os.sep, "/")
+                with zf.open(normalized_path) as f:
+                    return f.read().decode(self.encoding)
+        except KeyError as exc:
+            raise jinja2.TemplateNotFound(internal_path) from exc
+        except (OSError, zipfile.BadZipFile) as exc:
+            raise jinja2.TemplateNotFound(
+                f"{internal_path} (error reading from {archive_path}: {exc})"
+            ) from exc
+
+    def _get_source_from_single_zip(
+        self, archive_path: str, base_internal_path: str, template: str
+    ) -> tuple[str, str, Callable[[], bool]] | None:
+        """
+        Try to get template source from a single zip archive.
+
+        :param archive_path: Path to the zip file
+        :param base_internal_path: Base path inside the zip (may be empty)
+        :param template: The name of the template to load
+        :return: A tuple of (source, filename, up_to_date_func) if found, None 
otherwise
+        """
+        import posixpath
+
+        from jinja2.loaders import split_template_path
+

Review Comment:
   Imports inside `_get_source_from_single_zip()` (`import posixpath` and `from 
jinja2.loaders import split_template_path`) run on every template lookup. These 
can be moved to module scope since they don’t appear to be guarding circular 
imports or optional dependencies, improving readability and avoiding repeated 
work on a hot path.



##########
task-sdk/src/airflow/sdk/definitions/_internal/templater.py:
##########
@@ -58,6 +71,201 @@ def resolve(self, context: Context) -> Any:
 log = logging.getLogger(__name__)
 
 
+# This loader addresses the issue where template files in zipped DAG packages
+# could not be resolved by the standard FileSystemLoader.
+# See: https://github.com/apache/airflow/issues/59310
+class ZipAwareFileSystemLoader(jinja2.FileSystemLoader):
+    """
+    A Jinja2 template loader that supports resolving templates from zipped DAG 
packages.
+
+    Search paths may include filesystem directories, zip files, or 
subdirectories
+    within zip files. Searchpath ordering is preserved across zip and non-zip 
entries.
+    """
+
+    def __init__(
+        self,
+        searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]],
+        encoding: str = "utf-8",
+        followlinks: bool = False,
+    ) -> None:
+        # Convert to list first to process
+        if isinstance(searchpath, (str, os.PathLike)):
+            searchpath = [searchpath]
+        all_paths = [os.fspath(p) for p in searchpath]
+
+        # Separate zip paths from regular paths at initialization time (once)
+        # Store zip info by index to preserve searchpath order
+        self._zip_path_map: dict[int, tuple[str, str]] = {}  # {index: 
(archive_path, internal_base_path)}
+        # Per-path FileSystemLoader for non-zip paths, delegating to Jinja2's 
own logic
+        self._fs_loaders: dict[int, jinja2.FileSystemLoader] = {}
+        regular_paths: list[str] = []
+
+        for idx, path in enumerate(all_paths):
+            zip_info = self._parse_zip_path(path)
+            if zip_info:
+                self._zip_path_map[idx] = zip_info
+            else:
+                regular_paths.append(path)
+                self._fs_loaders[idx] = jinja2.FileSystemLoader([path], 
encoding, followlinks)
+
+        # Initialize parent with regular paths only (empty list is OK for our 
use case)
+        # We override get_source anyway, so parent's searchpath is only used 
for list_templates
+        super().__init__(regular_paths if regular_paths else [], encoding, 
followlinks)
+
+        # Store all paths for reference and error messages
+        self._all_searchpaths = all_paths
+        self.searchpath = all_paths
+
+    @staticmethod
+    def _parse_zip_path(path: str) -> tuple[str, str] | None:
+        """
+        Parse a path to extract zip archive and internal path components.
+
+        :param path: The path to parse
+        :return: Tuple of (archive_path, internal_base_path) if path is a zip 
path,
+                 None otherwise
+        """
+        # Check if the path itself is a zip file (no internal path)
+        if _has_zip_extension(path) and os.path.isfile(path) and 
zipfile.is_zipfile(path):
+            return (path, "")
+
+        # Check for paths inside a zip (e.g., "archive.zip/subdir" or 
"archive.zip\subdir")
+        match = ZIP_REGEX.match(path)
+        if match:
+            archive, internal = match.groups()
+            if archive and os.path.isfile(archive) and 
zipfile.is_zipfile(archive):
+                return (archive, internal or "")
+
+        return None
+
+    def _read_from_zip(self, archive_path: str, internal_path: str) -> str:
+        """
+        Read a file from inside a zip archive.
+
+        :param archive_path: Path to the zip file
+        :param internal_path: Path to the file inside the zip
+        :return: The file contents as a string
+        :raises TemplateNotFound: If the file doesn't exist in the zip
+        """
+        try:
+            with zipfile.ZipFile(archive_path, "r") as zf:
+                # Normalize path separators for zip (always forward slashes)
+                normalized_path = internal_path.replace(os.sep, "/")
+                with zf.open(normalized_path) as f:
+                    return f.read().decode(self.encoding)
+        except KeyError as exc:
+            raise jinja2.TemplateNotFound(internal_path) from exc
+        except (OSError, zipfile.BadZipFile) as exc:
+            raise jinja2.TemplateNotFound(
+                f"{internal_path} (error reading from {archive_path}: {exc})"

Review Comment:
   In `_read_from_zip()`, the `TemplateNotFound` raised for I/O/BadZipFile 
errors uses a formatted string as the exception’s first argument. This makes 
the exception’s `.name` something other than the requested template path. 
Prefer raising `TemplateNotFound(internal_path, message=...)` (or similar) so 
the missing template name remains accurate while still surfacing the underlying 
read error.
   



##########
task-sdk/tests/task_sdk/definitions/test_dag.py:
##########
@@ -941,3 +945,385 @@ def test_cycle_task_group_with_edge_labels(self):
                 op1 >> Label("label") >> op2
 
         assert not dag.check_cycle()
+
+
[email protected]
+def zipped_dag_with_template(tmp_path: Path) -> tuple[str, str]:
+    """
+    Create a zip file containing a DAG and a SQL template file.
+
+    Structure:
+        test_dag.zip/
+        ├── test_dag.py
+        └── templates/
+            └── query.sql
+
+    Returns:
+        tuple: (path to zip file, path to DAG file within zip)
+    """
+    zip_path = tmp_path / "test_dag.zip"
+
+    # Create the zip file with DAG and template
+    with zipfile.ZipFile(zip_path, "w") as zf:
+        # Add a simple DAG file
+        dag_content = """
+from airflow import DAG
+from datetime import datetime
+
+with DAG(dag_id="test_zip_dag", start_date=datetime(2024, 1, 1)) as dag:
+    pass
+"""
+        zf.writestr("test_dag.py", dag_content)
+
+        # Add template SQL file in subdirectory
+        sql_content = "SELECT column_a FROM {{ params.table_name }}"
+        zf.writestr("templates/query.sql", sql_content)
+
+        # Also add a template at root level
+        zf.writestr("root_query.sql", "SELECT * FROM {{ params.table }}")
+
+    # Return zip path and the fileloc as it would appear for a DAG in the zip
+    dag_fileloc = os.path.join(str(zip_path), "test_dag.py")
+    return str(zip_path), dag_fileloc
+
+
+class TestZippedDagTemplateResolution:
+    """
+    Test suite for template resolution in zipped DAGs.
+
+    These tests verify that ZipAwareFileSystemLoader correctly loads
+    templates from within zip archives, fixing Issue #59310.
+    """
+
+    def test_dag_folder_property_for_zipped_dag(self, 
zipped_dag_with_template):
+        """
+        Test that DAG.folder returns the zip file path for zipped DAGs.
+
+        For a DAG with fileloc='/path/to/test.zip/dag.py',
+        folder should return '/path/to/test.zip'.
+        """
+        zip_path, dag_fileloc = zipped_dag_with_template
+
+        dag = DAG(
+            dag_id="test_zipped_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+        )
+        # Manually set fileloc as it would be set when loading from zip
+        dag.fileloc = dag_fileloc
+
+        # The folder property uses os.path.dirname(fileloc)
+        expected_folder = zip_path
+        assert dag.folder == expected_folder
+
+    def test_template_file_loads_from_zipped_dag(self, 
zipped_dag_with_template):
+        """
+        Test that template files can be loaded from within zip archives.
+
+        This verifies the fix for Issue #59310 - templates inside zipped DAGs
+        should be accessible via ZipAwareFileSystemLoader.
+        """
+        _, dag_fileloc = zipped_dag_with_template
+
+        dag = DAG(
+            dag_id="test_zipped_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+        )
+        dag.fileloc = dag_fileloc
+
+        # Get the Jinja2 environment from the DAG
+        jinja_env = dag.get_template_env()
+
+        # Load template from subdirectory inside zip
+        template = jinja_env.get_template("templates/query.sql")
+        rendered = template.render(params={"table_name": "users"})
+        assert rendered == "SELECT column_a FROM users"
+
+    def test_root_level_template_loads_from_zipped_dag(self, 
zipped_dag_with_template):
+        """
+        Test that templates at the root level of the zip can be loaded.
+        """
+        _, dag_fileloc = zipped_dag_with_template
+
+        dag = DAG(
+            dag_id="test_zipped_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+        )
+        dag.fileloc = dag_fileloc
+
+        jinja_env = dag.get_template_env()
+
+        # Load template from root of zip
+        template = jinja_env.get_template("root_query.sql")
+        rendered = template.render(params={"table": "orders"})
+        assert rendered == "SELECT * FROM orders"
+
+    def test_template_searchpath_in_zipped_dag(self, zipped_dag_with_template):
+        """
+        Test that template_searchpath works correctly for zipped DAGs.
+
+        When template_searchpath points to a subdirectory inside the zip,
+        templates should be resolved relative to that path.
+        """
+        zip_path, dag_fileloc = zipped_dag_with_template
+
+        # Set template_searchpath to the templates folder inside the zip
+        template_path = os.path.join(zip_path, "templates")
+
+        dag = DAG(
+            dag_id="test_zipped_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+            template_searchpath=[template_path],
+        )
+        dag.fileloc = dag_fileloc
+
+        jinja_env = dag.get_template_env()
+
+        # Template should be found relative to the searchpath
+        template = jinja_env.get_template("query.sql")
+        rendered = template.render(params={"table_name": "products"})
+        assert rendered == "SELECT column_a FROM products"
+
+    def test_template_searchpath_relative_path_in_zipped_dag(self, 
zipped_dag_with_template):
+        """
+        Test that relative template_searchpath works correctly for zipped DAGs.
+
+        This is the core fix for Issue #59310 - when users specify:
+            template_searchpath=["templates"]
+        it should resolve to "{zip_path}/templates" automatically.
+        """
+        zip_path, dag_fileloc = zipped_dag_with_template
+
+        dag = DAG(
+            dag_id="test_zipped_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+            template_searchpath=["templates"],
+        )
+        dag.fileloc = dag_fileloc
+
+        jinja_env = dag.get_template_env()
+        template = jinja_env.get_template("query.sql")
+        rendered = template.render(params={"table_name": "users"})
+        assert rendered == "SELECT column_a FROM users"
+
+    def test_regular_dag_template_resolution_works(self, tmp_path: Path):
+        """
+        Test that template resolution works correctly for regular (non-zipped) 
DAGs.
+
+        This ensures ZipAwareFileSystemLoader maintains backward compatibility
+        with regular filesystem paths.
+        """
+        # Create a regular directory structure (not zipped)
+        dag_dir = tmp_path / "dags"
+        dag_dir.mkdir()
+        template_dir = dag_dir / "templates"
+        template_dir.mkdir()
+
+        # Create template file
+        sql_file = template_dir / "query.sql"
+        sql_file.write_text("SELECT * FROM {{ params.table }}")
+
+        # Create DAG file
+        dag_file = dag_dir / "test_dag.py"
+        dag_file.write_text("# DAG file")
+
+        dag = DAG(
+            dag_id="test_regular_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+        )
+        dag.fileloc = str(dag_file)
+
+        jinja_env = dag.get_template_env()
+
+        # This should work for regular directories
+        template = jinja_env.get_template("templates/query.sql")
+        rendered = template.render(params={"table": "users"})
+        assert rendered == "SELECT * FROM users"
+
+
+class TestOperatorTemplateFieldsInZippedDag:
+    """
+    Tests for resolving template fields in operators for zipped DAGs.
+    """
+
+    class DummyOperator(BaseOperator):
+        template_fields = ("sql", "query_list")
+        template_ext = (".sql",)
+
+        def __init__(self, sql: str, query_list: list[str] | None = None, 
**kwargs):
+            super().__init__(**kwargs)
+            self.sql = sql
+            self.query_list = query_list or []
+
+        def execute(self, context): ...
+
+    def test_operator_template_fields_resolve_in_zipped_dag(self, 
zipped_dag_with_template):
+        """
+        Test that operator template fields are resolved for zipped DAGs.
+        """
+        _, dag_fileloc = zipped_dag_with_template
+
+        dag = DAG(
+            dag_id="test_zipped_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+        )
+        dag.fileloc = dag_fileloc
+
+        # Operator with SQL file reference
+        task = self.DummyOperator(task_id="test_task", 
sql="templates/query.sql")
+        task.dag = dag
+
+        # Resolve template files
+        task.resolve_template_files()
+
+        # SQL should be resolved to file content
+        assert "SELECT column_a FROM" in task.sql
+
+    def test_operator_multiple_template_files_in_zipped_dag(self, 
zipped_dag_with_template):
+        """
+        Test that multiple template files in list fields are resolved.
+        """
+        _, dag_fileloc = zipped_dag_with_template
+
+        dag = DAG(
+            dag_id="test_zipped_dag",
+            schedule=None,
+            start_date=DEFAULT_ZIP_DATE,
+        )
+        dag.fileloc = dag_fileloc
+
+        task = self.DummyOperator(
+            task_id="test_task",
+            sql="templates/query.sql",
+            query_list=["templates/query.sql", "root_query.sql"],
+        )
+        task.dag = dag
+
+        # Resolve template files
+        task.resolve_template_files()
+
+        # Both template files should be resolved
+        assert "SELECT column_a FROM" in task.sql
+        assert "SELECT column_a FROM" in task.query_list[0]
+        assert "SELECT * FROM" in task.query_list[1]
+
+
+class TestZipAwareFileSystemLoader:
+    """
+    Tests for the ZipAwareFileSystemLoader class directly.
+    """
+
+    def test_loader_list_templates_in_zip(self, zipped_dag_with_template):
+        """
+        Test that list_templates() returns templates from zip archives.
+        """
+        from airflow.sdk.definitions._internal.templater import 
ZipAwareFileSystemLoader
+
+        zip_path, _ = zipped_dag_with_template
+
+        loader = ZipAwareFileSystemLoader([zip_path])
+        templates = loader.list_templates()
+
+        assert "test_dag.py" in templates
+        assert "templates/query.sql" in templates
+        assert "root_query.sql" in templates
+
+    def test_loader_mixed_searchpath(self, tmp_path: Path, 
zipped_dag_with_template):
+        """
+        Test that the loader works with mixed searchpaths (zip and regular 
directories).
+        """
+        import jinja2
+
+        from airflow.sdk.definitions._internal.templater import 
ZipAwareFileSystemLoader
+
+        zip_path, _ = zipped_dag_with_template
+
+        # Create a regular directory with a template
+        regular_dir = tmp_path / "regular_templates"
+        regular_dir.mkdir()
+        (regular_dir / "regular.sql").write_text("SELECT 1")
+
+        loader = ZipAwareFileSystemLoader([zip_path, str(regular_dir)])
+        env = jinja2.Environment(loader=loader)
+
+        # Should load from zip
+        template1 = env.get_template("templates/query.sql")
+        assert "SELECT column_a" in template1.render(params={"table_name": 
"test"})
+
+        # Should load from regular directory
+        template2 = env.get_template("regular.sql")
+        assert template2.render() == "SELECT 1"
+
+    def test_loader_respects_searchpath_order(self, tmp_path: Path, 
zipped_dag_with_template):
+        """
+        Test that searchpath order is respected when templates overlap.
+        """
+        import jinja2
+
+        from airflow.sdk.definitions._internal.templater import 
ZipAwareFileSystemLoader
+
+        zip_path, _ = zipped_dag_with_template
+
+        # Create a regular directory with a template that matches the zip path
+        regular_dir = tmp_path / "regular_templates"
+        (regular_dir / "templates").mkdir(parents=True)
+        (regular_dir / "templates" / "query.sql").write_text("SELECT 42")
+
+        # Regular path first should win
+        loader = ZipAwareFileSystemLoader([str(regular_dir), zip_path])
+        env = jinja2.Environment(loader=loader)
+        template = env.get_template("templates/query.sql")
+        assert template.render() == "SELECT 42"
+
+    def test_zip_path_traversal_is_blocked(self, zipped_dag_with_template):
+        """
+        Test that path traversal attempts are blocked inside zip paths.
+        """
+        import jinja2
+
+        from airflow.sdk.definitions._internal.templater import 
ZipAwareFileSystemLoader
+
+        zip_path, _ = zipped_dag_with_template
+        loader = ZipAwareFileSystemLoader([zip_path])
+        env = jinja2.Environment(loader=loader)
+
+        with pytest.raises(jinja2.TemplateNotFound):
+            env.get_template("../templates/query.sql")
+
+    def test_loader_case_insensitive_zip_extension(self, tmp_path: Path):
+        """Test that zip detection works with uppercase .ZIP extension."""
+        import jinja2
+
+        from airflow.sdk.definitions._internal.templater import 
ZipAwareFileSystemLoader
+
+        zip_path = tmp_path / "test_dag.ZIP"
+        with zipfile.ZipFile(zip_path, "w") as zf:
+            zf.writestr("templates/query.sql", "SELECT 1 FROM {{ params.table 
}}")
+
+        loader = ZipAwareFileSystemLoader([str(zip_path)])
+        env = jinja2.Environment(loader=loader)
+        template = env.get_template("templates/query.sql")
+        assert "SELECT 1" in template.render(params={"table": "test"})
+
+    def test_resolved_searchpath_case_insensitive_zip(self, tmp_path: Path):
+        """Test that _get_resolved_searchpath detects .ZIP extension."""
+        zip_path = tmp_path / "dags.ZIP"
+        with zipfile.ZipFile(zip_path, "w") as zf:
+            zf.writestr("dag.py", "pass")
+            zf.writestr("templates/query.sql", "SELECT 1")
+
+        dag_fileloc = os.path.join(str(zip_path), "dag.py")
+        dag = DAG(dag_id="test_zip_case", schedule=None, 
start_date=datetime(2024, 1, 1))
+        dag.fileloc = dag_fileloc
+
+        searchpath = dag._get_resolved_searchpath()

Review Comment:
   In `test_resolved_searchpath_case_insensitive_zip`, the first `searchpath = 
dag._get_resolved_searchpath()` call isn’t asserted against and is immediately 
overwritten. If the intent is to cover the default behavior, consider asserting 
expected contents (e.g. that the zip path is present) or removing the unused 
call to keep the test focused.
   



##########
task-sdk/src/airflow/sdk/definitions/_internal/templater.py:
##########
@@ -58,6 +71,201 @@ def resolve(self, context: Context) -> Any:
 log = logging.getLogger(__name__)
 
 
+# This loader addresses the issue where template files in zipped DAG packages
+# could not be resolved by the standard FileSystemLoader.
+# See: https://github.com/apache/airflow/issues/59310
+class ZipAwareFileSystemLoader(jinja2.FileSystemLoader):
+    """
+    A Jinja2 template loader that supports resolving templates from zipped DAG 
packages.
+
+    Search paths may include filesystem directories, zip files, or 
subdirectories
+    within zip files. Searchpath ordering is preserved across zip and non-zip 
entries.
+    """
+
+    def __init__(
+        self,
+        searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]],
+        encoding: str = "utf-8",
+        followlinks: bool = False,
+    ) -> None:
+        # Convert to list first to process
+        if isinstance(searchpath, (str, os.PathLike)):
+            searchpath = [searchpath]
+        all_paths = [os.fspath(p) for p in searchpath]
+
+        # Separate zip paths from regular paths at initialization time (once)
+        # Store zip info by index to preserve searchpath order
+        self._zip_path_map: dict[int, tuple[str, str]] = {}  # {index: 
(archive_path, internal_base_path)}
+        # Per-path FileSystemLoader for non-zip paths, delegating to Jinja2's 
own logic
+        self._fs_loaders: dict[int, jinja2.FileSystemLoader] = {}
+        regular_paths: list[str] = []
+
+        for idx, path in enumerate(all_paths):
+            zip_info = self._parse_zip_path(path)
+            if zip_info:
+                self._zip_path_map[idx] = zip_info
+            else:
+                regular_paths.append(path)
+                self._fs_loaders[idx] = jinja2.FileSystemLoader([path], 
encoding, followlinks)
+
+        # Initialize parent with regular paths only (empty list is OK for our 
use case)
+        # We override get_source anyway, so parent's searchpath is only used 
for list_templates
+        super().__init__(regular_paths if regular_paths else [], encoding, 
followlinks)
+
+        # Store all paths for reference and error messages
+        self._all_searchpaths = all_paths
+        self.searchpath = all_paths
+
+    @staticmethod
+    def _parse_zip_path(path: str) -> tuple[str, str] | None:
+        """
+        Parse a path to extract zip archive and internal path components.
+
+        :param path: The path to parse
+        :return: Tuple of (archive_path, internal_base_path) if path is a zip 
path,
+                 None otherwise
+        """
+        # Check if the path itself is a zip file (no internal path)
+        if _has_zip_extension(path) and os.path.isfile(path) and 
zipfile.is_zipfile(path):
+            return (path, "")
+
+        # Check for paths inside a zip (e.g., "archive.zip/subdir" or 
"archive.zip\subdir")
+        match = ZIP_REGEX.match(path)
+        if match:
+            archive, internal = match.groups()
+            if archive and os.path.isfile(archive) and 
zipfile.is_zipfile(archive):
+                return (archive, internal or "")
+
+        return None
+
+    def _read_from_zip(self, archive_path: str, internal_path: str) -> str:
+        """
+        Read a file from inside a zip archive.
+
+        :param archive_path: Path to the zip file
+        :param internal_path: Path to the file inside the zip
+        :return: The file contents as a string
+        :raises TemplateNotFound: If the file doesn't exist in the zip
+        """
+        try:
+            with zipfile.ZipFile(archive_path, "r") as zf:
+                # Normalize path separators for zip (always forward slashes)
+                normalized_path = internal_path.replace(os.sep, "/")
+                with zf.open(normalized_path) as f:
+                    return f.read().decode(self.encoding)
+        except KeyError as exc:
+            raise jinja2.TemplateNotFound(internal_path) from exc
+        except (OSError, zipfile.BadZipFile) as exc:
+            raise jinja2.TemplateNotFound(
+                f"{internal_path} (error reading from {archive_path}: {exc})"
+            ) from exc
+
+    def _get_source_from_single_zip(
+        self, archive_path: str, base_internal_path: str, template: str
+    ) -> tuple[str, str, Callable[[], bool]] | None:
+        """
+        Try to get template source from a single zip archive.
+
+        :param archive_path: Path to the zip file
+        :param base_internal_path: Base path inside the zip (may be empty)
+        :param template: The name of the template to load
+        :return: A tuple of (source, filename, up_to_date_func) if found, None 
otherwise
+        """
+        import posixpath
+
+        from jinja2.loaders import split_template_path
+
+        pieces = split_template_path(template)
+        if base_internal_path:
+            internal_path = posixpath.join(base_internal_path, *pieces)
+        else:
+            internal_path = "/".join(pieces)
+
+        try:
+            source = self._read_from_zip(archive_path, internal_path)
+            filename = os.path.join(archive_path, internal_path)
+
+            archive_mtime = os.path.getmtime(archive_path)
+
+            def up_to_date(archive: str = archive_path, mtime: float = 
archive_mtime) -> bool:
+                try:
+                    return os.path.getmtime(archive) == mtime
+                except OSError:
+                    return False
+
+            return source, filename, up_to_date
+        except jinja2.TemplateNotFound:
+            return None
+
+    def get_source(
+        self, environment: jinja2.Environment, template: str
+    ) -> tuple[str, str, Callable[[], bool]]:
+        """
+        Get the template source, filename, and reload helper for a template.
+
+        Searches through searchpaths in order, handling both zip archives and
+        regular filesystem paths according to their original order.
+
+        :param environment: The Jinja2 environment
+        :param template: The name of the template to load
+        :return: A tuple of (source, filename, up_to_date_func)
+        :raises TemplateNotFound: If the template cannot be found
+        """
+        for idx, _path in enumerate(self._all_searchpaths):
+            if idx in self._zip_path_map:
+                archive_path, base_internal_path = self._zip_path_map[idx]
+                result = self._get_source_from_single_zip(archive_path, 
base_internal_path, template)
+                if result:
+                    return result
+            elif idx in self._fs_loaders:
+                try:
+                    return self._fs_loaders[idx].get_source(environment, 
template)
+                except jinja2.TemplateNotFound:
+                    continue
+
+        # Template not found in any searchpath
+        raise jinja2.TemplateNotFound(
+            f"'{template}' not found in search path: {', '.join(repr(p) for p 
in self._all_searchpaths)}"

Review Comment:
   `get_source()` raises `jinja2.TemplateNotFound` with a formatted message 
string (including searchpaths). In Jinja2, the first argument is treated as the 
template *name*, so this changes `.name` from the requested template to the 
whole message and can break callers/tooling that rely on the template name. 
Consider raising `TemplateNotFound(template)` and, if you want extra context, 
pass it via the optional message argument (or log it) rather than replacing the 
name.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to