soooojinlee commented on code in PR #60999: URL: https://github.com/apache/airflow/pull/60999#discussion_r2971730429
########## task-sdk/src/airflow/sdk/definitions/_internal/templater.py: ########## @@ -58,6 +65,235 @@ def resolve(self, context: Context) -> Any: log = logging.getLogger(__name__) +# This loader addresses the issue where template files in zipped DAG packages +# could not be resolved by the standard FileSystemLoader. +# See: https://github.com/apache/airflow/issues/59310 +class ZipAwareFileSystemLoader(jinja2.FileSystemLoader): + """ + A Jinja2 template loader that supports resolving templates from zipped DAG packages. + + Search paths may include filesystem directories, zip files, or subdirectories + within zip files. Searchpath ordering is preserved across zip and non-zip entries. + """ + + def __init__( + self, + searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]], + encoding: str = "utf-8", + followlinks: bool = False, + ) -> None: + # Convert to list first to process + if isinstance(searchpath, (str, os.PathLike)): + searchpath = [searchpath] + all_paths = [os.fspath(p) for p in searchpath] + + # Separate zip paths from regular paths at initialization time (once) + # Store zip info by index to preserve searchpath order + self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)} + regular_paths: list[str] = [] + + for idx, path in enumerate(all_paths): + zip_info = self._parse_zip_path(path) + if zip_info: + self._zip_path_map[idx] = zip_info + else: + regular_paths.append(path) + + # Store regular paths for filesystem lookups + self._regular_searchpaths = regular_paths + + # Initialize parent with regular paths only (empty list is OK for our use case) + # We override get_source anyway, so parent's searchpath is only used for list_templates + super().__init__(regular_paths if regular_paths else [], encoding, followlinks) + + # Store all paths for reference and error messages + self._all_searchpaths = all_paths + self.searchpath = all_paths + + @staticmethod + def _parse_zip_path(path: str) -> tuple[str, str] | None: + """ + Parse a path to extract zip archive and internal path components. + + :param path: The path to parse + :return: Tuple of (archive_path, internal_base_path) if path is a zip path, + None otherwise + """ + # Check if the path itself is a zip file (no internal path) + if path.endswith(".zip") and os.path.isfile(path) and zipfile.is_zipfile(path): Review Comment: Fixed. Added a shared `_has_zip_extension` helper with case-insensitive check, and updated `ZIP_REGEX` with r`e.IGNORECASE` to match `zipfile.is_zipfile()` behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
