gopidesupavan commented on code in PR #60999: URL: https://github.com/apache/airflow/pull/60999#discussion_r2969379570
########## task-sdk/src/airflow/sdk/definitions/_internal/templater.py: ########## @@ -58,6 +65,235 @@ def resolve(self, context: Context) -> Any: log = logging.getLogger(__name__) +# This loader addresses the issue where template files in zipped DAG packages +# could not be resolved by the standard FileSystemLoader. +# See: https://github.com/apache/airflow/issues/59310 +class ZipAwareFileSystemLoader(jinja2.FileSystemLoader): + """ + A Jinja2 template loader that supports resolving templates from zipped DAG packages. + + Search paths may include filesystem directories, zip files, or subdirectories + within zip files. Searchpath ordering is preserved across zip and non-zip entries. + """ + + def __init__( + self, + searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]], + encoding: str = "utf-8", + followlinks: bool = False, + ) -> None: + # Convert to list first to process + if isinstance(searchpath, (str, os.PathLike)): + searchpath = [searchpath] + all_paths = [os.fspath(p) for p in searchpath] + + # Separate zip paths from regular paths at initialization time (once) + # Store zip info by index to preserve searchpath order + self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)} + regular_paths: list[str] = [] + + for idx, path in enumerate(all_paths): + zip_info = self._parse_zip_path(path) + if zip_info: + self._zip_path_map[idx] = zip_info + else: + regular_paths.append(path) + + # Store regular paths for filesystem lookups + self._regular_searchpaths = regular_paths + + # Initialize parent with regular paths only (empty list is OK for our use case) + # We override get_source anyway, so parent's searchpath is only used for list_templates + super().__init__(regular_paths if regular_paths else [], encoding, followlinks) + + # Store all paths for reference and error messages + self._all_searchpaths = all_paths + self.searchpath = all_paths + + @staticmethod + def _parse_zip_path(path: str) -> tuple[str, str] | None: + """ + Parse a path to extract zip archive and internal path components. + + :param path: The path to parse + :return: Tuple of (archive_path, internal_base_path) if path is a zip path, + None otherwise + """ + # Check if the path itself is a zip file (no internal path) + if path.endswith(".zip") and os.path.isfile(path) and zipfile.is_zipfile(path): Review Comment: This zip detection is case-sensitive, so valid archives like `dags.ZIP` fall back to the filesystem path path instead of being treated as zipped searchpaths. python importer in dag `airflow-core/src/airflow/dag_processing/importers/python_importer.py` accepts any valid zip via `zipfile.is_zipfile()`, so this creates a mismatch where the DAG can be imported successfully but template loading breaks. ########## task-sdk/src/airflow/sdk/definitions/_internal/templater.py: ########## @@ -58,6 +65,235 @@ def resolve(self, context: Context) -> Any: log = logging.getLogger(__name__) +# This loader addresses the issue where template files in zipped DAG packages +# could not be resolved by the standard FileSystemLoader. +# See: https://github.com/apache/airflow/issues/59310 +class ZipAwareFileSystemLoader(jinja2.FileSystemLoader): + """ + A Jinja2 template loader that supports resolving templates from zipped DAG packages. + + Search paths may include filesystem directories, zip files, or subdirectories + within zip files. Searchpath ordering is preserved across zip and non-zip entries. + """ + + def __init__( + self, + searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]], + encoding: str = "utf-8", + followlinks: bool = False, + ) -> None: + # Convert to list first to process + if isinstance(searchpath, (str, os.PathLike)): + searchpath = [searchpath] + all_paths = [os.fspath(p) for p in searchpath] + + # Separate zip paths from regular paths at initialization time (once) + # Store zip info by index to preserve searchpath order + self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)} + regular_paths: list[str] = [] + + for idx, path in enumerate(all_paths): + zip_info = self._parse_zip_path(path) + if zip_info: + self._zip_path_map[idx] = zip_info + else: + regular_paths.append(path) + + # Store regular paths for filesystem lookups + self._regular_searchpaths = regular_paths + + # Initialize parent with regular paths only (empty list is OK for our use case) + # We override get_source anyway, so parent's searchpath is only used for list_templates + super().__init__(regular_paths if regular_paths else [], encoding, followlinks) + + # Store all paths for reference and error messages + self._all_searchpaths = all_paths + self.searchpath = all_paths + + @staticmethod + def _parse_zip_path(path: str) -> tuple[str, str] | None: + """ + Parse a path to extract zip archive and internal path components. + + :param path: The path to parse + :return: Tuple of (archive_path, internal_base_path) if path is a zip path, + None otherwise + """ + # Check if the path itself is a zip file (no internal path) + if path.endswith(".zip") and os.path.isfile(path) and zipfile.is_zipfile(path): + return (path, "") + + # Check for paths inside a zip (e.g., "archive.zip/subdir" or "archive.zip\subdir") + match = ZIP_REGEX.match(path) + if match: + archive, internal = match.groups() + if archive and os.path.isfile(archive) and zipfile.is_zipfile(archive): + return (archive, internal or "") + + return None + + def _read_from_zip(self, archive_path: str, internal_path: str) -> str: + """ + Read a file from inside a zip archive. + + :param archive_path: Path to the zip file + :param internal_path: Path to the file inside the zip + :return: The file contents as a string + :raises TemplateNotFound: If the file doesn't exist in the zip + """ + try: + with zipfile.ZipFile(archive_path, "r") as zf: + # Normalize path separators for zip (always forward slashes) + normalized_path = internal_path.replace(os.sep, "/") + with zf.open(normalized_path) as f: + return f.read().decode(self.encoding) + except KeyError as exc: + raise jinja2.TemplateNotFound(internal_path) from exc + except (OSError, zipfile.BadZipFile) as exc: + raise jinja2.TemplateNotFound( + f"{internal_path} (error reading from {archive_path}: {exc})" + ) from exc + + def _get_source_from_single_zip( + self, archive_path: str, base_internal_path: str, template: str + ) -> tuple[str, str, Callable[[], bool]] | None: + """ + Try to get template source from a single zip archive. + + :param archive_path: Path to the zip file + :param base_internal_path: Base path inside the zip (may be empty) + :param template: The name of the template to load + :return: A tuple of (source, filename, up_to_date_func) if found, None otherwise + """ + import posixpath + + from jinja2.loaders import split_template_path + + pieces = split_template_path(template) + if base_internal_path: + internal_path = posixpath.join(base_internal_path, *pieces) + else: + internal_path = "/".join(pieces) + + try: + source = self._read_from_zip(archive_path, internal_path) + filename = os.path.join(archive_path, internal_path) + + archive_mtime = os.path.getmtime(archive_path) + + def up_to_date(archive: str = archive_path, mtime: float = archive_mtime) -> bool: + try: + return os.path.getmtime(archive) == mtime + except OSError: + return False + + return source, filename, up_to_date + except jinja2.TemplateNotFound: + return None + + def _get_source_from_filesystem( Review Comment: Would it be simpler to delegate regular filesystem lookups back to `jinja2.FileSystemLoader` here instead of reimplementing them? This class really only needs custom behavior for zip-backed searchpaths, and delegating the non-zip path would preserve Jinja's existing path handling. which helps in edge cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
