This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/sbp by this push:
     new 65a03fbf Check .zip structure using the extracted files only
65a03fbf is described below

commit 65a03fbf981ea9cfff846566fb7ed5cdbe71051e
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Mar 10 19:54:01 2026 +0000

    Check .zip structure using the extracted files only
---
 atr/tasks/checks/targz.py                |  10 +-
 atr/tasks/checks/zipformat.py            | 200 ++++++++++++++-----------------
 tests/unit/test_archive_member_limit.py  |  15 ---
 tests/unit/test_archive_root_variants.py |  69 ++++++-----
 4 files changed, 137 insertions(+), 157 deletions(-)

diff --git a/atr/tasks/checks/targz.py b/atr/tasks/checks/targz.py
index 02534032..302e5b3b 100644
--- a/atr/tasks/checks/targz.py
+++ b/atr/tasks/checks/targz.py
@@ -32,7 +32,7 @@ import atr.util as util
 # Release policy fields which this check relies on - used for result caching
 INPUT_POLICY_KEYS: Final[list[str]] = []
 INPUT_EXTRA_ARGS: Final[list[str]] = []
-CHECK_VERSION: Final[str] = "2"
+CHECK_VERSION: Final[str] = "3"
 
 
 class RootDirectoryError(Exception):
@@ -71,6 +71,14 @@ def root_directory(cache_dir: pathlib.Path) -> tuple[str, 
bytes | None]:
         raise RootDirectoryError(f"Multiple root directories found: 
{entries[0]}, {entries[1]}")
 
     root = entries[0]
+    root_path = cache_dir / root
+    try:
+        root_stat = root_path.lstat()
+    except OSError as e:
+        raise RootDirectoryError(f"Unable to inspect root entry '{root}': 
{e}") from e
+    if not stat.S_ISDIR(root_stat.st_mode):
+        raise RootDirectoryError(f"Root entry is not a directory: {root}")
+
     package_json: bytes | None = None
 
     if root == "package":
diff --git a/atr/tasks/checks/zipformat.py b/atr/tasks/checks/zipformat.py
index 74c13dee..df474bf9 100644
--- a/atr/tasks/checks/zipformat.py
+++ b/atr/tasks/checks/zipformat.py
@@ -17,6 +17,8 @@
 
 import asyncio
 import os
+import pathlib
+import stat
 import zipfile
 from typing import Any, Final
 
@@ -29,7 +31,7 @@ import atr.util as util
 # Release policy fields which this check relies on - used for result caching
 INPUT_POLICY_KEYS: Final[list[str]] = []
 INPUT_EXTRA_ARGS: Final[list[str]] = []
-CHECK_VERSION: Final[str] = "1"
+CHECK_VERSION: Final[str] = "2"
 
 
 async def integrity(args: checks.FunctionArguments) -> results.Results | None:
@@ -56,20 +58,30 @@ async def integrity(args: checks.FunctionArguments) -> 
results.Results | None:
 
 async def structure(args: checks.FunctionArguments) -> results.Results | None:
     """Check that the zip archive has a single root directory matching the 
artifact name."""
+    # TODO: We can merge this with the targz structure check
+    # Now that we migrated to extracted trees, they're very similar
+    # For simplicity, they've been updated separately for now
+    # (There are several small differences to resolve between the two)
     recorder = await args.recorder()
     if not (artifact_abs_path := await recorder.abs_path()):
         return None
     if await recorder.primary_path_is_binary():
         return None
 
+    cache_dir = await checks.resolve_cache_dir(args)
+    if cache_dir is None:
+        await recorder.failure(
+            "Extracted archive tree is not available",
+            {"rel_path": args.primary_rel_path},
+        )
+        return None
+
     log.info(f"Checking zip structure for {artifact_abs_path} (rel: 
{args.primary_rel_path})")
 
     try:
-        result_data = await asyncio.to_thread(_structure_check_core_logic, 
str(artifact_abs_path))
+        result_data = await asyncio.to_thread(_structure_check_core_logic, 
cache_dir, str(artifact_abs_path))
 
-        if result_data.get("warning"):
-            await recorder.failure(result_data["warning"], result_data)
-        elif result_data.get("error"):
+        if result_data.get("error"):
             await recorder.failure(result_data["error"], result_data)
         else:
             await recorder.success(f"Zip structure OK (root: 
{result_data['root_dir']})", result_data)
@@ -97,113 +109,85 @@ def _integrity_check_core_logic(artifact_path: str) -> 
dict[str, Any]:
         return {"error": f"Unexpected error: {e}"}
 
 
-def _structure_check_core_logic(artifact_path: str) -> dict[str, Any]:  # 
noqa: C901
+def _structure_check_core_logic(cache_dir: pathlib.Path, artifact_path: str) 
-> dict[str, Any]:
     """Verify the internal structure of the zip archive."""
-    try:
-        with tarzip.open_archive(artifact_path) as archive:
-            members: list[tarzip.Member] = []
-            package_json: bytes | None = None
-
-            for member in archive:
-                members.append(member)
-                if package_json is None:
-                    member_name = member.name.lstrip("./")
-                    if (member_name == "package/package.json") and 
member.isfile():
-                        size = member.size if hasattr(member, "size") else 0
-                        if (size > 0) and (size <= 
util.NPM_PACKAGE_JSON_MAX_SIZE):
-                            f = archive.extractfile(member)
-                            if f is not None:
-                                try:
-                                    package_json = f.read()
-                                finally:
-                                    f.close()
-
-            if not members:
-                return {"error": "Archive is empty"}
-
-            base_name = os.path.basename(artifact_path)
-            basename_from_filename = base_name.removesuffix(".zip")
-            expected_roots = 
util.permitted_archive_roots(basename_from_filename)
-
-            root_dirs, non_rooted_files = 
_structure_check_core_logic_find_roots(members)
-            member_names = [m.name for m in members]
-            actual_root, error_msg = _structure_check_core_logic_validate_root(
-                member_names, root_dirs, non_rooted_files, expected_roots
-            )
-
-            if error_msg is not None:
-                if (actual_root == "package") and (package_json is not None):
-                    npm_info, _ = util.parse_npm_pack_info(package_json, 
basename_from_filename)
-                    if npm_info is not None:
-                        npm_data: dict[str, Any] = {
-                            "root_dir": actual_root,
-                            "expected_roots": expected_roots,
-                            "npm_pack": {
-                                "name": npm_info.name,
-                                "version": npm_info.version,
-                                "filename_match": npm_info.filename_match,
-                            },
-                        }
-                        if npm_info.filename_match is False:
-                            npm_data["error"] = "npm pack layout detected but 
filename does not match package.json"
-                        return npm_data
-                result_data: dict[str, Any] = {"expected_roots": 
expected_roots}
-                if error_msg.startswith("Root directory mismatch"):
-                    result_data["error"] = error_msg
-                else:
-                    result_data["error"] = error_msg
-                return result_data
-            if actual_root:
-                return {"root_dir": actual_root, "expected_roots": 
expected_roots}
-            return {"error": "Unknown structure validation error"}
-
-    except tarzip.ArchiveMemberLimitExceededError as e:
-        return {"error": f"Archive has too many members: {e}"}
-    except zipfile.BadZipFile as e:
-        return {"error": f"Bad zip file: {e}"}
-    except FileNotFoundError:
-        return {"error": "File not found"}
-    except Exception as e:
-        return {"error": f"Unexpected error: {e}"}
+    # The ._ prefix is a metadata convention
+    entries = sorted(e for e in os.listdir(cache_dir) if not 
e.startswith("._"))
+    if not entries:
+        return {"error": "Archive is empty"}
+
+    base_name = os.path.basename(artifact_path)
+    basename_from_filename = base_name.removesuffix(".zip")
+    expected_roots = util.permitted_archive_roots(basename_from_filename)
+
+    root_dirs: list[str] = []
+    non_rooted_entries: list[str] = []
+    for entry in entries:
+        entry_path = cache_dir / entry
+        try:
+            entry_stat = entry_path.lstat()
+        except OSError as e:
+            return {"error": f"Unable to inspect archive root entry '{entry}': 
{e}", "expected_roots": expected_roots}
+
+        if stat.S_ISDIR(entry_stat.st_mode):
+            root_dirs.append(entry)
+        else:
+            non_rooted_entries.append(entry)
 
+    if non_rooted_entries:
+        return {"error": f"Files found directly in root: 
{non_rooted_entries}", "expected_roots": expected_roots}
 
-def _structure_check_core_logic_find_roots(members: list[tarzip.Member]) -> 
tuple[set[str], list[str]]:
-    """Identify root directories and non-rooted files in a zip archive."""
-    root_dirs: set[str] = set()
-    non_rooted_files: list[str] = []
-    for member in members:
-        if "/" in member.name:
-            root_dirs.add(member.name.split("/", 1)[0])
-        elif not member.isdir():
-            non_rooted_files.append(member.name)
-    return root_dirs, non_rooted_files
-
-
-def _structure_check_core_logic_validate_root(
-    members: list[str], root_dirs: set[str], non_rooted_files: list[str], 
expected_roots: list[str]
-) -> tuple[str | None, str | None]:
-    """Validate the identified root structure against expectations."""
-    if non_rooted_files:
-        return None, f"Files found directly in root: {non_rooted_files}"
     if not root_dirs:
-        return None, "No directories found in archive"
+        return {"error": "No directories found in archive", "expected_roots": 
expected_roots}
     if len(root_dirs) > 1:
-        return None, f"Multiple root directories found: 
{sorted(list(root_dirs))}"
-
-    actual_root = next(iter(root_dirs))
-    if actual_root not in expected_roots:
-        expected_roots_display = "', '".join(expected_roots)
-        return (
-            actual_root,
-            f"Root directory mismatch. Expected one of 
'{expected_roots_display}', found '{actual_root}'",
-        )
+        return {"error": f"Multiple root directories found: {root_dirs}", 
"expected_roots": expected_roots}
+
+    actual_root = root_dirs[0]
+    if actual_root in expected_roots:
+        return {"root_dir": actual_root, "expected_roots": expected_roots}
 
-    # Check whether all members are under the correct root directory
-    expected_prefix = f"{actual_root.rstrip('/')}/"
-    for member in members:
-        if member == actual_root.rstrip("/"):
-            continue
-        if not member.startswith(expected_prefix):
-            return None, f"Member found outside expected root directory: 
{member}"
+    if (actual_root == "package") and (
+        npm_result := _structure_npm_result(cache_dir, basename_from_filename, 
actual_root, expected_roots)
+    ):
+        return npm_result
+
+    expected_roots_display = "', '".join(expected_roots)
+    return {
+        "error": f"Root directory mismatch. Expected one of 
'{expected_roots_display}', found '{actual_root}'",
+        "expected_roots": expected_roots,
+    }
+
+
+def _structure_npm_result(
+    cache_dir: pathlib.Path, basename_from_filename: str, actual_root: str, 
expected_roots: list[str]
+) -> dict[str, Any] | None:
+    package_json_path = cache_dir / "package" / "package.json"
+    try:
+        package_json_stat = package_json_path.lstat()
+    except (FileNotFoundError, OSError):
+        return None
+
+    if not stat.S_ISREG(package_json_stat.st_mode):
+        return None
+
+    size = package_json_stat.st_size
+    if (size <= 0) or (size > util.NPM_PACKAGE_JSON_MAX_SIZE):
+        return None
+
+    package_json = package_json_path.read_bytes()
+    npm_info, _ = util.parse_npm_pack_info(package_json, 
basename_from_filename)
+    if npm_info is None:
+        return None
 
-    return actual_root, None
+    npm_data: dict[str, Any] = {
+        "root_dir": actual_root,
+        "expected_roots": expected_roots,
+        "npm_pack": {
+            "name": npm_info.name,
+            "version": npm_info.version,
+            "filename_match": npm_info.filename_match,
+        },
+    }
+    if npm_info.filename_match is False:
+        npm_data["error"] = "npm pack layout detected but filename does not 
match package.json"
+    return npm_data
diff --git a/tests/unit/test_archive_member_limit.py 
b/tests/unit/test_archive_member_limit.py
index b2ae672b..e22e50cb 100644
--- a/tests/unit/test_archive_member_limit.py
+++ b/tests/unit/test_archive_member_limit.py
@@ -157,21 +157,6 @@ def 
test_zipformat_integrity_reports_member_limit(tmp_path, monkeypatch):
     assert "too many members" in result.get("error", "").lower()
 
 
-def test_zipformat_structure_reports_member_limit(tmp_path, monkeypatch):
-    archive_path = tmp_path / "sample.zip"
-    _make_zip(archive_path, ["a.txt", "b.txt", "c.txt"])
-
-    original_open = tarzip.open_archive
-
-    def limited_open(path: str, *args, **kwargs):
-        return original_open(path, max_members=2)
-
-    monkeypatch.setattr(tarzip, "open_archive", limited_open)
-
-    result = zipformat._structure_check_core_logic(str(archive_path))
-    assert "too many members" in result.get("error", "").lower()
-
-
 async def _args_for(recorder: recorders.RecorderStub) -> 
checks.FunctionArguments:
     return checks.FunctionArguments(
         recorder=recorders.get_recorder(recorder),
diff --git a/tests/unit/test_archive_root_variants.py 
b/tests/unit/test_archive_root_variants.py
index 7b7b56f2..07457a3d 100644
--- a/tests/unit/test_archive_root_variants.py
+++ b/tests/unit/test_archive_root_variants.py
@@ -18,7 +18,6 @@
 import json
 import pathlib
 import unittest.mock as mock
-import zipfile
 
 import pytest
 
@@ -158,6 +157,19 @@ async def 
test_targz_structure_rejects_src_root_when_filename_has_source_suffix(
     assert any(status == sql.CheckResultStatus.FAILURE.value for status, _, _ 
in recorder.messages)
 
 
[email protected]
+async def test_targz_structure_rejects_symlink_root(tmp_path: pathlib.Path) -> 
None:
+    cache_dir = tmp_path / "cache"
+    cache_dir.mkdir()
+    (cache_dir / "apache-example-1.2.3").symlink_to(cache_dir / 
"missing-target")
+    recorder, args = await _targz_structure_args(tmp_path, 
"apache-example-1.2.3.tar.gz")
+
+    with mock.patch.object(checks, "resolve_cache_dir", 
new=mock.AsyncMock(return_value=cache_dir)):
+        await targz.structure(args)
+
+    assert any(status == sql.CheckResultStatus.FAILURE.value for status, _, _ 
in recorder.messages)
+
+
 @pytest.mark.asyncio
 async def test_targz_structure_rejects_symlinked_package_json(tmp_path: 
pathlib.Path) -> None:
     cache_dir = _make_cache_tree_with_contents(
@@ -177,54 +189,48 @@ async def 
test_targz_structure_rejects_symlinked_package_json(tmp_path: pathlib.
 
 
 def test_zipformat_structure_accepts_npm_pack_root(tmp_path: pathlib.Path) -> 
None:
-    archive_path = tmp_path / "example-1.2.3.zip"
-    _make_zip_with_contents(
-        archive_path,
+    cache_dir = _make_cache_tree_with_contents(
+        tmp_path,
         {
             "package/package.json": json.dumps({"name": "example", "version": 
"1.2.3"}),
             "package/README.txt": "hello",
         },
     )
 
-    result = zipformat._structure_check_core_logic(str(archive_path))
+    result = zipformat._structure_check_core_logic(cache_dir, 
"example-1.2.3.zip")
 
     assert result.get("error") is None
-    assert result.get("warning") is None
     assert result.get("root_dir") == "package"
 
 
 def test_zipformat_structure_accepts_src_suffix_variant(tmp_path: 
pathlib.Path) -> None:
-    archive_path = tmp_path / "apache-example-1.2.3-src.zip"
-    _make_zip(archive_path, ["apache-example-1.2.3/README.txt"])
+    cache_dir = _make_cache_tree(tmp_path, ["apache-example-1.2.3/README.txt"])
 
-    result = zipformat._structure_check_core_logic(str(archive_path))
+    result = zipformat._structure_check_core_logic(cache_dir, 
"apache-example-1.2.3-src.zip")
 
     assert result.get("error") is None
-    assert result.get("warning") is None
     assert result.get("root_dir") == "apache-example-1.2.3"
 
 
 def test_zipformat_structure_rejects_dated_src_suffix(tmp_path: pathlib.Path) 
-> None:
-    archive_path = tmp_path / "apache-example-1.2.3-src-20251202.zip"
-    _make_zip(archive_path, ["apache-example-1.2.3/README.txt"])
+    cache_dir = _make_cache_tree(tmp_path, ["apache-example-1.2.3/README.txt"])
 
-    result = zipformat._structure_check_core_logic(str(archive_path))
+    result = zipformat._structure_check_core_logic(cache_dir, 
"apache-example-1.2.3-src-20251202.zip")
 
     assert "error" in result
     assert "Root directory mismatch" in result["error"]
 
 
 def test_zipformat_structure_rejects_npm_pack_filename_mismatch(tmp_path: 
pathlib.Path) -> None:
-    archive_path = tmp_path / "example-1.2.3.zip"
-    _make_zip_with_contents(
-        archive_path,
+    cache_dir = _make_cache_tree_with_contents(
+        tmp_path,
         {
             "package/package.json": json.dumps({"name": "different", 
"version": "1.2.3"}),
             "package/README.txt": "hello",
         },
     )
 
-    result = zipformat._structure_check_core_logic(str(archive_path))
+    result = zipformat._structure_check_core_logic(cache_dir, 
"example-1.2.3.zip")
 
     assert result.get("error") is not None
     assert "npm pack layout detected" in result["error"]
@@ -232,20 +238,29 @@ def 
test_zipformat_structure_rejects_npm_pack_filename_mismatch(tmp_path: pathli
 
 
 def 
test_zipformat_structure_rejects_package_root_without_package_json(tmp_path: 
pathlib.Path) -> None:
-    archive_path = tmp_path / "example-1.2.3.zip"
-    _make_zip_with_contents(
-        archive_path,
+    cache_dir = _make_cache_tree_with_contents(
+        tmp_path,
         {
             "package/README.txt": "hello",
         },
     )
 
-    result = zipformat._structure_check_core_logic(str(archive_path))
+    result = zipformat._structure_check_core_logic(cache_dir, 
"example-1.2.3.zip")
 
     assert result.get("error") is not None
     assert "Root directory mismatch" in result["error"]
 
 
+def test_zipformat_structure_rejects_top_level_symlink(tmp_path: pathlib.Path) 
-> None:
+    cache_dir = _make_cache_tree(tmp_path, ["example-1.2.3/README.txt"])
+    (cache_dir / "link").symlink_to(cache_dir / "missing-target")
+
+    result = zipformat._structure_check_core_logic(cache_dir, 
"example-1.2.3.zip")
+
+    assert result.get("error") is not None
+    assert "Files found directly in root" in result["error"]
+
+
 def _make_cache_tree(base: pathlib.Path, members: list[str]) -> pathlib.Path:
     """Create a directory tree simulating the quarantine extraction cache."""
     cache_dir = base / "cache"
@@ -268,18 +283,6 @@ def _make_cache_tree_with_contents(base: pathlib.Path, 
members: dict[str, str])
     return cache_dir
 
 
-def _make_zip(path: pathlib.Path, members: list[str]) -> None:
-    with zipfile.ZipFile(path, "w") as zf:
-        for name in members:
-            zf.writestr(name, f"data-{name}")
-
-
-def _make_zip_with_contents(path: pathlib.Path, members: dict[str, str]) -> 
None:
-    with zipfile.ZipFile(path, "w") as zf:
-        for name, content in members.items():
-            zf.writestr(name, content)
-
-
 async def _targz_structure_args(
     tmp_path: pathlib.Path, archive_filename: str
 ) -> tuple[recorders.RecorderStub, checks.FunctionArguments]:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to