This is an automated email from the ASF dual-hosted git repository.

juergbi pushed a commit to branch juerg/tar
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 084851249a9f98a88d3e6927a7ae2d668f79f483
Author: Jürg Billeter <[email protected]>
AuthorDate: Fri Jul 12 14:47:21 2024 +0200

    tar.py: Always check member paths
    
    The paths of tarball members are already checked to guard against
    extraction outside the target directory in the common case. However, if
    `base-dir` is set to the empty string, the checks were skipped.
    
    This corrects the code to always check member paths. On Python 3.12+,
    this uses the new extraction filter support, which should also ensure
    consistent behavior on Python 3.14+, which will use the restrictive
    'data' filter by default.
---
 src/buildstream/plugins/sources/tar.py | 119 ++++++++++++++++++---------------
 1 file changed, 64 insertions(+), 55 deletions(-)

diff --git a/src/buildstream/plugins/sources/tar.py 
b/src/buildstream/plugins/sources/tar.py
index 5c45d1c94..87f86b25f 100644
--- a/src/buildstream/plugins/sources/tar.py
+++ b/src/buildstream/plugins/sources/tar.py
@@ -52,10 +52,13 @@ See :ref:`built-in functionality doumentation 
<core_source_builtins>` for
 details on common configuration options for sources.
 """
 
+import functools
 import os
+import sys
 import tarfile
 from contextlib import contextmanager
 from tempfile import TemporaryFile
+from typing import Optional
 
 from buildstream import DownloadableFileSource, SourceError
 from buildstream import utils
@@ -130,54 +133,53 @@ class TarSource(DownloadableFileSource):
                 base_dir = None
                 if self.base_dir:
                     base_dir = self._find_base_dir(tar, self.base_dir)
+                    if not base_dir.endswith(os.sep):
+                        base_dir = base_dir + os.sep
 
-                def filter_non_dev(tarfiles):
-                    for file in tarfiles:
-                        if not file.isdev():
-                            yield file
-
-                if base_dir:
-                    tar.extractall(
-                        path=directory, 
members=filter_non_dev(self._extract_members(tar, base_dir, directory))
-                    )
+                filter_function = functools.partial(TarSource._extract_filter, 
base_dir)
+                if sys.version_info >= (3, 12):
+                    tar.extractall(path=directory, filter=filter_function)
                 else:
-                    tar.extractall(path=directory, 
members=filter_non_dev(tar.getmembers()))
+                    filtered_members = []
+                    for member in tar.getmembers():
+                        member = filter_function(member, directory)
+                        if member is not None:
+                            filtered_members.append(member)
+                    tar.extractall(path=directory, members=filtered_members)
 
         except (tarfile.TarError, OSError) as e:
             raise SourceError("{}: Error staging source: {}".format(self, e)) 
from e
 
-    # Override and translate which filenames to extract
-    def _extract_members(self, tar, base_dir, target_dir):
-
-        # Assert that a tarfile is safe to extract; specifically, make
-        # sure that we don't do anything outside of the target
-        # directory (this is possible, if, say, someone engineered a
-        # tarfile to contain paths that start with ..).
-        def assert_safe(member):
-            final_path = os.path.abspath(os.path.join(target_dir, member.path))
-            if not final_path.startswith(target_dir):
+    # Assert that a tarfile is safe to extract; specifically, make
+    # sure that we don't do anything outside of the target
+    # directory (this is possible, if, say, someone engineered a
+    # tarfile to contain paths that start with ..).
+    def _assert_safe(member: tarfile.TarInfo, target_dir: str):
+        final_path = os.path.abspath(os.path.join(target_dir, member.path))
+        if not final_path.startswith(target_dir):
+            raise SourceError(
+                "{}: Tarfile attempts to extract outside the staging area: "
+                "{} -> {}".format(self, member.path, final_path)
+            )
+
+        if member.islnk():
+            linked_path = os.path.abspath(os.path.join(target_dir, 
member.linkname))
+            if not linked_path.startswith(target_dir):
                 raise SourceError(
-                    "{}: Tarfile attempts to extract outside the staging area: 
"
+                    "{}: Tarfile attempts to hardlink outside the staging 
area: "
                     "{} -> {}".format(self, member.path, final_path)
                 )
 
-            if member.islnk():
-                linked_path = os.path.abspath(os.path.join(target_dir, 
member.linkname))
-                if not linked_path.startswith(target_dir):
-                    raise SourceError(
-                        "{}: Tarfile attempts to hardlink outside the staging 
area: "
-                        "{} -> {}".format(self, member.path, final_path)
-                    )
-
-            # Don't need to worry about symlinks because they're just
-            # files here and won't be able to do much harm once we are
-            # in a sandbox.
+        # Don't need to worry about symlinks because they're just
+        # files here and won't be able to do much harm once we are
+        # in a sandbox.
 
-        if not base_dir.endswith(os.sep):
-            base_dir = base_dir + os.sep
-
-        L = len(base_dir)
-        for member in tar.getmembers():
+    def _extract_filter(
+        base_dir: Optional[str], member: tarfile.TarInfo, target_dir: str
+    ) -> Optional[tarfile.TarInfo]:
+        if base_dir:
+            # Override and translate which filenames to extract
+            L = len(base_dir)
 
             # First, ensure that a member never starts with `./`
             if member.path.startswith("./"):
@@ -186,24 +188,31 @@ class TarSource(DownloadableFileSource):
                 member.linkname = member.linkname[2:]
 
             # Now extract only the paths which match the normalized path
-            if member.path.startswith(base_dir):
-                # Hardlinks are smart and collapse into the "original"
-                # when their counterpart doesn't exist. This means we
-                # only need to modify links to files whose location we
-                # change.
-                #
-                # Since we assert that we're not linking to anything
-                # outside the target directory, this should only ever
-                # be able to link to things inside the target
-                # directory, so we should cover all bases doing this.
-                #
-                if member.islnk() and member.linkname.startswith(base_dir):
-                    member.linkname = member.linkname[L:]
-
-                member.path = member.path[L:]
-
-                assert_safe(member)
-                yield member
+            if not member.path.startswith(base_dir):
+                return None
+
+            # Hardlinks are smart and collapse into the "original"
+            # when their counterpart doesn't exist. This means we
+            # only need to modify links to files whose location we
+            # change.
+            #
+            # Since we assert that we're not linking to anything
+            # outside the target directory, this should only ever
+            # be able to link to things inside the target
+            # directory, so we should cover all bases doing this.
+            #
+            if member.islnk() and member.linkname.startswith(base_dir):
+                member.linkname = member.linkname[L:]
+
+            member.path = member.path[L:]
+
+        TarSource._assert_safe(member, target_dir)
+
+        # Skip device nodes
+        if member.isdev():
+            return None
+
+        return member
 
     # We want to iterate over all paths of a tarball, but getmembers()
     # is not enough because some tarballs simply do not contain the leading

Reply via email to