This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/sbp by this push:
new 03114918 Add some simple archive checks for quarantined file validation
03114918 is described below
commit 0311491882d6e4696a1b74b9744e2ed8553c2054
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Feb 24 15:51:50 2026 +0000
Add some simple archive checks for quarantined file validation
---
atr/detection.py | 48 +++++++++++++
tests/unit/test_detection.py | 165 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 213 insertions(+)
diff --git a/atr/detection.py b/atr/detection.py
index 3f17195c..745a6667 100644
--- a/atr/detection.py
+++ b/atr/detection.py
@@ -16,11 +16,14 @@
# under the License.
import pathlib
+import tarfile
+import zipfile
from typing import Final
import puremagic
import atr.models.attestable as models
+import atr.tarzip as tarzip
_BZIP2_TYPES: Final[set[str]] = {"application/x-bzip2"}
_DEB_TYPES: Final[set[str]] = {"application/vnd.debian.binary-package",
"application/x-archive"}
@@ -61,6 +64,23 @@ _QUARANTINE_ARCHIVE_SUFFIXES: Final[tuple[str, ...]] =
(".tar.gz", ".tgz", ".zip
_QUARANTINE_NORMALISED_SUFFIXES: Final[dict[str, str]] = {".tgz": ".tar.gz"}
+def check_archive_safety(archive_path: str) -> list[str]:
+ errors: list[str] = []
+ try:
+ with tarzip.open_archive(archive_path) as archive:
+ for member in archive:
+ if _archive_member_has_path_traversal(member.name):
+ errors.append(f"{member.name}: Archive member path
traversal is not allowed")
+
+ if (member.issym() or member.islnk()) and
_archive_link_escapes_root(member.name, member.linkname):
+ link_target = member.linkname or ""
+ errors.append(f"{member.name}: Archive link target escapes
root ({link_target})")
+ except (tarfile.TarError, zipfile.BadZipFile, ValueError,
tarzip.ArchiveMemberLimitExceededError) as e:
+ errors.append(f"Failed to read archive: {e}")
+
+ return errors
+
+
def detect_archives_requiring_quarantine(
path_to_hash: dict[str, str], previous_attestable: models.AttestableV1 |
None
) -> list[str]:
@@ -103,6 +123,34 @@ def validate_directory(directory: pathlib.Path) ->
list[str]:
return errors
+def _archive_link_escapes_root(member_name: str, link_target: str | None) ->
bool:
+ if link_target is None:
+ return False
+ if link_target.startswith("/"):
+ return True
+
+ link_parts = pathlib.PurePosixPath(link_target).parts
+ base_parts = pathlib.PurePosixPath(member_name).parent.parts
+ depth = 0
+ for part in (*base_parts, *link_parts):
+ if part in ("", ".", "/"):
+ continue
+ if part == "..":
+ if depth == 0:
+ return True
+ depth -= 1
+ else:
+ depth += 1
+ return False
+
+
+def _archive_member_has_path_traversal(path_key: str) -> bool:
+ if path_key.startswith("/"):
+ return True
+
+ return ".." in pathlib.PurePosixPath(path_key).parts
+
+
def _path_basename(path_key: str) -> str:
return path_key.rsplit("/", maxsplit=1)[-1]
diff --git a/tests/unit/test_detection.py b/tests/unit/test_detection.py
index 7a89ee46..6168e7d2 100644
--- a/tests/unit/test_detection.py
+++ b/tests/unit/test_detection.py
@@ -15,9 +15,124 @@
# specific language governing permissions and limitations
# under the License.
+import io
+import pathlib
+import tarfile
+import zipfile
+
import atr.detection as detection
import atr.models.attestable as models
+type TarArchiveEntry = tuple[str, str, bytes | str]
+
+
+def test_check_archive_safety_accepts_safe_tar_gz(tmp_path):
+ archive_path = tmp_path / "safe.tar.gz"
+ _write_tar_gz(
+ archive_path,
+ [
+ _tar_regular_file("dist/apache-widget-1.0-src.tar.gz", b"payload"),
+ _tar_regular_file("docs/readme.txt", b"hello"),
+ ],
+ )
+
+ assert detection.check_archive_safety(str(archive_path)) == []
+
+
+def test_check_archive_safety_accepts_safe_zip(tmp_path):
+ archive_path = tmp_path / "safe.zip"
+ _write_zip(
+ archive_path,
+ [
+ ("dist/apache-widget-1.0-src.zip", b"payload"),
+ ("docs/readme.txt", b"hello"),
+ ],
+ )
+
+ assert detection.check_archive_safety(str(archive_path)) == []
+
+
+def test_check_archive_safety_rejects_absolute_paths_in_tar_and_zip(tmp_path):
+ tar_path = tmp_path / "unsafe-absolute.tar.gz"
+ _write_tar_gz(
+ tar_path,
+ [
+ _tar_regular_file("/absolute.txt", b"x"),
+ ],
+ )
+ zip_path = tmp_path / "unsafe-absolute.zip"
+ _write_zip(
+ zip_path,
+ [
+ ("/absolute.txt", b"x"),
+ ],
+ )
+
+ tar_errors = detection.check_archive_safety(str(tar_path))
+ zip_errors = detection.check_archive_safety(str(zip_path))
+
+ assert any("/absolute.txt" in error for error in tar_errors)
+ assert any("path traversal" in error for error in tar_errors)
+ assert any("/absolute.txt" in error for error in zip_errors)
+ assert any("path traversal" in error for error in zip_errors)
+
+
+def test_check_archive_safety_rejects_hardlink_target_outside_root(tmp_path):
+ archive_path = tmp_path / "unsafe-hardlink.tar.gz"
+ _write_tar_gz(
+ archive_path,
+ [
+ _tar_regular_file("dist/file.txt", b"ok"),
+ _tar_hardlink("dist/hard", "../../outside.txt"),
+ ],
+ )
+
+ errors = detection.check_archive_safety(str(archive_path))
+
+ assert any("dist/hard" in error for error in errors)
+ assert any("escapes root" in error for error in errors)
+
+
+def
test_check_archive_safety_rejects_parent_path_traversal_in_tar_and_zip(tmp_path):
+ tar_path = tmp_path / "unsafe-parent.tar.gz"
+ _write_tar_gz(
+ tar_path,
+ [
+ _tar_regular_file("../outside.txt", b"x"),
+ ],
+ )
+ zip_path = tmp_path / "unsafe-parent.zip"
+ _write_zip(
+ zip_path,
+ [
+ ("../outside.txt", b"x"),
+ ],
+ )
+
+ tar_errors = detection.check_archive_safety(str(tar_path))
+ zip_errors = detection.check_archive_safety(str(zip_path))
+
+ assert any("../outside.txt" in error for error in tar_errors)
+ assert any("path traversal" in error for error in tar_errors)
+ assert any("../outside.txt" in error for error in zip_errors)
+ assert any("path traversal" in error for error in zip_errors)
+
+
+def test_check_archive_safety_rejects_symlink_target_outside_root(tmp_path):
+ archive_path = tmp_path / "unsafe-symlink.tar.gz"
+ _write_tar_gz(
+ archive_path,
+ [
+ _tar_regular_file("dist/file.txt", b"ok"),
+ _tar_symlink("dist/link", "../../outside.txt"),
+ ],
+ )
+
+ errors = detection.check_archive_safety(str(archive_path))
+
+ assert any("dist/link" in error for error in errors)
+ assert any("escapes root" in error for error in errors)
+
def
test_detect_archives_requiring_quarantine_known_hash_and_different_extension():
previous = models.AttestableV1(
@@ -126,3 +241,53 @@ def
test_detect_archives_requiring_quarantine_tgz_and_tar_gz_are_equivalent():
)
assert result == []
+
+
+def _tar_hardlink(name: str, link_target: str) -> TarArchiveEntry:
+ return ("hardlink", name, link_target)
+
+
+def _tar_regular_file(name: str, data: bytes) -> TarArchiveEntry:
+ return ("file", name, data)
+
+
+def _tar_symlink(name: str, link_target: str) -> TarArchiveEntry:
+ return ("symlink", name, link_target)
+
+
+def _write_tar_gz(archive_path: pathlib.Path, members: list[TarArchiveEntry])
-> None:
+ with tarfile.open(archive_path, "w:gz") as archive:
+ for member_type, member_name, member_data in members:
+ if member_type == "file":
+ if not isinstance(member_data, bytes):
+ raise ValueError("Tar regular file data must be bytes")
+ info = tarfile.TarInfo(member_name)
+ info.size = len(member_data)
+ archive.addfile(info, io.BytesIO(member_data))
+ continue
+
+ if member_type == "symlink":
+ if not isinstance(member_data, str):
+ raise ValueError("Tar symlink data must be a path string")
+ info = tarfile.TarInfo(member_name)
+ info.type = tarfile.SYMTYPE
+ info.linkname = member_data
+ archive.addfile(info)
+ continue
+
+ if member_type == "hardlink":
+ if not isinstance(member_data, str):
+ raise ValueError("Tar hardlink data must be a path string")
+ info = tarfile.TarInfo(member_name)
+ info.type = tarfile.LNKTYPE
+ info.linkname = member_data
+ archive.addfile(info)
+ continue
+
+ raise ValueError(f"Unsupported tar member type: {member_type}")
+
+
+def _write_zip(archive_path: pathlib.Path, members: list[tuple[str, bytes]])
-> None:
+ with zipfile.ZipFile(archive_path, "w") as archive:
+ for member_name, member_data in members:
+ archive.writestr(member_name, member_data)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]