soooojinlee commented on code in PR #60999: URL: https://github.com/apache/airflow/pull/60999#discussion_r2971731842
########## task-sdk/src/airflow/sdk/definitions/_internal/templater.py: ########## @@ -58,6 +65,235 @@ def resolve(self, context: Context) -> Any: log = logging.getLogger(__name__) +# This loader addresses the issue where template files in zipped DAG packages +# could not be resolved by the standard FileSystemLoader. +# See: https://github.com/apache/airflow/issues/59310 +class ZipAwareFileSystemLoader(jinja2.FileSystemLoader): + """ + A Jinja2 template loader that supports resolving templates from zipped DAG packages. + + Search paths may include filesystem directories, zip files, or subdirectories + within zip files. Searchpath ordering is preserved across zip and non-zip entries. + """ + + def __init__( + self, + searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]], + encoding: str = "utf-8", + followlinks: bool = False, + ) -> None: + # Convert to list first to process + if isinstance(searchpath, (str, os.PathLike)): + searchpath = [searchpath] + all_paths = [os.fspath(p) for p in searchpath] + + # Separate zip paths from regular paths at initialization time (once) + # Store zip info by index to preserve searchpath order + self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)} + regular_paths: list[str] = [] + + for idx, path in enumerate(all_paths): + zip_info = self._parse_zip_path(path) + if zip_info: + self._zip_path_map[idx] = zip_info + else: + regular_paths.append(path) + + # Store regular paths for filesystem lookups + self._regular_searchpaths = regular_paths + + # Initialize parent with regular paths only (empty list is OK for our use case) + # We override get_source anyway, so parent's searchpath is only used for list_templates + super().__init__(regular_paths if regular_paths else [], encoding, followlinks) + + # Store all paths for reference and error messages + self._all_searchpaths = all_paths + self.searchpath = all_paths + + @staticmethod + def _parse_zip_path(path: str) -> tuple[str, str] | None: + """ + Parse a path to extract zip archive and internal path components. + + :param path: The path to parse + :return: Tuple of (archive_path, internal_base_path) if path is a zip path, + None otherwise + """ + # Check if the path itself is a zip file (no internal path) + if path.endswith(".zip") and os.path.isfile(path) and zipfile.is_zipfile(path): + return (path, "") + + # Check for paths inside a zip (e.g., "archive.zip/subdir" or "archive.zip\subdir") + match = ZIP_REGEX.match(path) + if match: + archive, internal = match.groups() + if archive and os.path.isfile(archive) and zipfile.is_zipfile(archive): + return (archive, internal or "") + + return None + + def _read_from_zip(self, archive_path: str, internal_path: str) -> str: + """ + Read a file from inside a zip archive. + + :param archive_path: Path to the zip file + :param internal_path: Path to the file inside the zip + :return: The file contents as a string + :raises TemplateNotFound: If the file doesn't exist in the zip + """ + try: + with zipfile.ZipFile(archive_path, "r") as zf: + # Normalize path separators for zip (always forward slashes) + normalized_path = internal_path.replace(os.sep, "/") + with zf.open(normalized_path) as f: + return f.read().decode(self.encoding) + except KeyError as exc: + raise jinja2.TemplateNotFound(internal_path) from exc + except (OSError, zipfile.BadZipFile) as exc: + raise jinja2.TemplateNotFound( + f"{internal_path} (error reading from {archive_path}: {exc})" + ) from exc + + def _get_source_from_single_zip( + self, archive_path: str, base_internal_path: str, template: str + ) -> tuple[str, str, Callable[[], bool]] | None: + """ + Try to get template source from a single zip archive. + + :param archive_path: Path to the zip file + :param base_internal_path: Base path inside the zip (may be empty) + :param template: The name of the template to load + :return: A tuple of (source, filename, up_to_date_func) if found, None otherwise + """ + import posixpath + + from jinja2.loaders import split_template_path + + pieces = split_template_path(template) + if base_internal_path: + internal_path = posixpath.join(base_internal_path, *pieces) + else: + internal_path = "/".join(pieces) + + try: + source = self._read_from_zip(archive_path, internal_path) + filename = os.path.join(archive_path, internal_path) + + archive_mtime = os.path.getmtime(archive_path) + + def up_to_date(archive: str = archive_path, mtime: float = archive_mtime) -> bool: + try: + return os.path.getmtime(archive) == mtime + except OSError: + return False + + return source, filename, up_to_date + except jinja2.TemplateNotFound: + return None + + def _get_source_from_filesystem( Review Comment: Good point. Removed `_get_source_from_filesystem` and now delegate each non-zip path to a per-path `jinja2.FileSystemLoader` instance, preserving searchpath order while using Jinja2's own lookup logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
