This is an automated email from the ASF dual-hosted git repository. juergbi pushed a commit to branch juerg/tar in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 084851249a9f98a88d3e6927a7ae2d668f79f483 Author: Jürg Billeter <[email protected]> AuthorDate: Fri Jul 12 14:47:21 2024 +0200 tar.py: Always check member paths The paths of tarball members are already checked to guard against extraction outside the target directory in the common case. However, if `base-dir` is set to the empty string, the checks were skipped. This corrects the code to always check member paths. On Python 3.12+, this uses the new extraction filter support, which should also ensure consistent behavior on Python 3.14+, which will use the restrictive 'data' filter by default. --- src/buildstream/plugins/sources/tar.py | 119 ++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 55 deletions(-) diff --git a/src/buildstream/plugins/sources/tar.py b/src/buildstream/plugins/sources/tar.py index 5c45d1c94..87f86b25f 100644 --- a/src/buildstream/plugins/sources/tar.py +++ b/src/buildstream/plugins/sources/tar.py @@ -52,10 +52,13 @@ See :ref:`built-in functionality doumentation <core_source_builtins>` for details on common configuration options for sources. """ +import functools import os +import sys import tarfile from contextlib import contextmanager from tempfile import TemporaryFile +from typing import Optional from buildstream import DownloadableFileSource, SourceError from buildstream import utils @@ -130,54 +133,53 @@ class TarSource(DownloadableFileSource): base_dir = None if self.base_dir: base_dir = self._find_base_dir(tar, self.base_dir) + if not base_dir.endswith(os.sep): + base_dir = base_dir + os.sep - def filter_non_dev(tarfiles): - for file in tarfiles: - if not file.isdev(): - yield file - - if base_dir: - tar.extractall( - path=directory, members=filter_non_dev(self._extract_members(tar, base_dir, directory)) - ) + filter_function = functools.partial(TarSource._extract_filter, base_dir) + if sys.version_info >= (3, 12): + tar.extractall(path=directory, filter=filter_function) else: - tar.extractall(path=directory, members=filter_non_dev(tar.getmembers())) + filtered_members = [] + for member in tar.getmembers(): + member = filter_function(member, directory) + if member is not None: + filtered_members.append(member) + tar.extractall(path=directory, members=filtered_members) except (tarfile.TarError, OSError) as e: raise SourceError("{}: Error staging source: {}".format(self, e)) from e - # Override and translate which filenames to extract - def _extract_members(self, tar, base_dir, target_dir): - - # Assert that a tarfile is safe to extract; specifically, make - # sure that we don't do anything outside of the target - # directory (this is possible, if, say, someone engineered a - # tarfile to contain paths that start with ..). - def assert_safe(member): - final_path = os.path.abspath(os.path.join(target_dir, member.path)) - if not final_path.startswith(target_dir): + # Assert that a tarfile is safe to extract; specifically, make + # sure that we don't do anything outside of the target + # directory (this is possible, if, say, someone engineered a + # tarfile to contain paths that start with ..). + def _assert_safe(member: tarfile.TarInfo, target_dir: str): + final_path = os.path.abspath(os.path.join(target_dir, member.path)) + if not final_path.startswith(target_dir): + raise SourceError( + "{}: Tarfile attempts to extract outside the staging area: " + "{} -> {}".format(self, member.path, final_path) + ) + + if member.islnk(): + linked_path = os.path.abspath(os.path.join(target_dir, member.linkname)) + if not linked_path.startswith(target_dir): raise SourceError( - "{}: Tarfile attempts to extract outside the staging area: " + "{}: Tarfile attempts to hardlink outside the staging area: " "{} -> {}".format(self, member.path, final_path) ) - if member.islnk(): - linked_path = os.path.abspath(os.path.join(target_dir, member.linkname)) - if not linked_path.startswith(target_dir): - raise SourceError( - "{}: Tarfile attempts to hardlink outside the staging area: " - "{} -> {}".format(self, member.path, final_path) - ) - - # Don't need to worry about symlinks because they're just - # files here and won't be able to do much harm once we are - # in a sandbox. + # Don't need to worry about symlinks because they're just + # files here and won't be able to do much harm once we are + # in a sandbox. - if not base_dir.endswith(os.sep): - base_dir = base_dir + os.sep - - L = len(base_dir) - for member in tar.getmembers(): + def _extract_filter( + base_dir: Optional[str], member: tarfile.TarInfo, target_dir: str + ) -> Optional[tarfile.TarInfo]: + if base_dir: + # Override and translate which filenames to extract + L = len(base_dir) # First, ensure that a member never starts with `./` if member.path.startswith("./"): @@ -186,24 +188,31 @@ class TarSource(DownloadableFileSource): member.linkname = member.linkname[2:] # Now extract only the paths which match the normalized path - if member.path.startswith(base_dir): - # Hardlinks are smart and collapse into the "original" - # when their counterpart doesn't exist. This means we - # only need to modify links to files whose location we - # change. - # - # Since we assert that we're not linking to anything - # outside the target directory, this should only ever - # be able to link to things inside the target - # directory, so we should cover all bases doing this. - # - if member.islnk() and member.linkname.startswith(base_dir): - member.linkname = member.linkname[L:] - - member.path = member.path[L:] - - assert_safe(member) - yield member + if not member.path.startswith(base_dir): + return None + + # Hardlinks are smart and collapse into the "original" + # when their counterpart doesn't exist. This means we + # only need to modify links to files whose location we + # change. + # + # Since we assert that we're not linking to anything + # outside the target directory, this should only ever + # be able to link to things inside the target + # directory, so we should cover all bases doing this. + # + if member.islnk() and member.linkname.startswith(base_dir): + member.linkname = member.linkname[L:] + + member.path = member.path[L:] + + TarSource._assert_safe(member, target_dir) + + # Skip device nodes + if member.isdev(): + return None + + return member # We want to iterate over all paths of a tarball, but getmembers() # is not enough because some tarballs simply do not contain the leading
