This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/sbp by this push:
new 65a03fbf Check .zip structure using the extracted files only
65a03fbf is described below
commit 65a03fbf981ea9cfff846566fb7ed5cdbe71051e
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Mar 10 19:54:01 2026 +0000
Check .zip structure using the extracted files only
---
atr/tasks/checks/targz.py | 10 +-
atr/tasks/checks/zipformat.py | 200 ++++++++++++++-----------------
tests/unit/test_archive_member_limit.py | 15 ---
tests/unit/test_archive_root_variants.py | 69 ++++++-----
4 files changed, 137 insertions(+), 157 deletions(-)
diff --git a/atr/tasks/checks/targz.py b/atr/tasks/checks/targz.py
index 02534032..302e5b3b 100644
--- a/atr/tasks/checks/targz.py
+++ b/atr/tasks/checks/targz.py
@@ -32,7 +32,7 @@ import atr.util as util
# Release policy fields which this check relies on - used for result caching
INPUT_POLICY_KEYS: Final[list[str]] = []
INPUT_EXTRA_ARGS: Final[list[str]] = []
-CHECK_VERSION: Final[str] = "2"
+CHECK_VERSION: Final[str] = "3"
class RootDirectoryError(Exception):
@@ -71,6 +71,14 @@ def root_directory(cache_dir: pathlib.Path) -> tuple[str,
bytes | None]:
raise RootDirectoryError(f"Multiple root directories found:
{entries[0]}, {entries[1]}")
root = entries[0]
+ root_path = cache_dir / root
+ try:
+ root_stat = root_path.lstat()
+ except OSError as e:
+ raise RootDirectoryError(f"Unable to inspect root entry '{root}':
{e}") from e
+ if not stat.S_ISDIR(root_stat.st_mode):
+ raise RootDirectoryError(f"Root entry is not a directory: {root}")
+
package_json: bytes | None = None
if root == "package":
diff --git a/atr/tasks/checks/zipformat.py b/atr/tasks/checks/zipformat.py
index 74c13dee..df474bf9 100644
--- a/atr/tasks/checks/zipformat.py
+++ b/atr/tasks/checks/zipformat.py
@@ -17,6 +17,8 @@
import asyncio
import os
+import pathlib
+import stat
import zipfile
from typing import Any, Final
@@ -29,7 +31,7 @@ import atr.util as util
# Release policy fields which this check relies on - used for result caching
INPUT_POLICY_KEYS: Final[list[str]] = []
INPUT_EXTRA_ARGS: Final[list[str]] = []
-CHECK_VERSION: Final[str] = "1"
+CHECK_VERSION: Final[str] = "2"
async def integrity(args: checks.FunctionArguments) -> results.Results | None:
@@ -56,20 +58,30 @@ async def integrity(args: checks.FunctionArguments) ->
results.Results | None:
async def structure(args: checks.FunctionArguments) -> results.Results | None:
"""Check that the zip archive has a single root directory matching the
artifact name."""
+ # TODO: We can merge this with the targz structure check
+ # Now that we migrated to extracted trees, they're very similar
+ # For simplicity, they've been updated separately for now
+ # (There are several small differences to resolve between the two)
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
if await recorder.primary_path_is_binary():
return None
+ cache_dir = await checks.resolve_cache_dir(args)
+ if cache_dir is None:
+ await recorder.failure(
+ "Extracted archive tree is not available",
+ {"rel_path": args.primary_rel_path},
+ )
+ return None
+
log.info(f"Checking zip structure for {artifact_abs_path} (rel:
{args.primary_rel_path})")
try:
- result_data = await asyncio.to_thread(_structure_check_core_logic,
str(artifact_abs_path))
+ result_data = await asyncio.to_thread(_structure_check_core_logic,
cache_dir, str(artifact_abs_path))
- if result_data.get("warning"):
- await recorder.failure(result_data["warning"], result_data)
- elif result_data.get("error"):
+ if result_data.get("error"):
await recorder.failure(result_data["error"], result_data)
else:
await recorder.success(f"Zip structure OK (root:
{result_data['root_dir']})", result_data)
@@ -97,113 +109,85 @@ def _integrity_check_core_logic(artifact_path: str) ->
dict[str, Any]:
return {"error": f"Unexpected error: {e}"}
-def _structure_check_core_logic(artifact_path: str) -> dict[str, Any]: #
noqa: C901
+def _structure_check_core_logic(cache_dir: pathlib.Path, artifact_path: str)
-> dict[str, Any]:
"""Verify the internal structure of the zip archive."""
- try:
- with tarzip.open_archive(artifact_path) as archive:
- members: list[tarzip.Member] = []
- package_json: bytes | None = None
-
- for member in archive:
- members.append(member)
- if package_json is None:
- member_name = member.name.lstrip("./")
- if (member_name == "package/package.json") and
member.isfile():
- size = member.size if hasattr(member, "size") else 0
- if (size > 0) and (size <=
util.NPM_PACKAGE_JSON_MAX_SIZE):
- f = archive.extractfile(member)
- if f is not None:
- try:
- package_json = f.read()
- finally:
- f.close()
-
- if not members:
- return {"error": "Archive is empty"}
-
- base_name = os.path.basename(artifact_path)
- basename_from_filename = base_name.removesuffix(".zip")
- expected_roots =
util.permitted_archive_roots(basename_from_filename)
-
- root_dirs, non_rooted_files =
_structure_check_core_logic_find_roots(members)
- member_names = [m.name for m in members]
- actual_root, error_msg = _structure_check_core_logic_validate_root(
- member_names, root_dirs, non_rooted_files, expected_roots
- )
-
- if error_msg is not None:
- if (actual_root == "package") and (package_json is not None):
- npm_info, _ = util.parse_npm_pack_info(package_json,
basename_from_filename)
- if npm_info is not None:
- npm_data: dict[str, Any] = {
- "root_dir": actual_root,
- "expected_roots": expected_roots,
- "npm_pack": {
- "name": npm_info.name,
- "version": npm_info.version,
- "filename_match": npm_info.filename_match,
- },
- }
- if npm_info.filename_match is False:
- npm_data["error"] = "npm pack layout detected but
filename does not match package.json"
- return npm_data
- result_data: dict[str, Any] = {"expected_roots":
expected_roots}
- if error_msg.startswith("Root directory mismatch"):
- result_data["error"] = error_msg
- else:
- result_data["error"] = error_msg
- return result_data
- if actual_root:
- return {"root_dir": actual_root, "expected_roots":
expected_roots}
- return {"error": "Unknown structure validation error"}
-
- except tarzip.ArchiveMemberLimitExceededError as e:
- return {"error": f"Archive has too many members: {e}"}
- except zipfile.BadZipFile as e:
- return {"error": f"Bad zip file: {e}"}
- except FileNotFoundError:
- return {"error": "File not found"}
- except Exception as e:
- return {"error": f"Unexpected error: {e}"}
+ # The ._ prefix is a metadata convention
+ entries = sorted(e for e in os.listdir(cache_dir) if not
e.startswith("._"))
+ if not entries:
+ return {"error": "Archive is empty"}
+
+ base_name = os.path.basename(artifact_path)
+ basename_from_filename = base_name.removesuffix(".zip")
+ expected_roots = util.permitted_archive_roots(basename_from_filename)
+
+ root_dirs: list[str] = []
+ non_rooted_entries: list[str] = []
+ for entry in entries:
+ entry_path = cache_dir / entry
+ try:
+ entry_stat = entry_path.lstat()
+ except OSError as e:
+ return {"error": f"Unable to inspect archive root entry '{entry}':
{e}", "expected_roots": expected_roots}
+
+ if stat.S_ISDIR(entry_stat.st_mode):
+ root_dirs.append(entry)
+ else:
+ non_rooted_entries.append(entry)
+ if non_rooted_entries:
+ return {"error": f"Files found directly in root:
{non_rooted_entries}", "expected_roots": expected_roots}
-def _structure_check_core_logic_find_roots(members: list[tarzip.Member]) ->
tuple[set[str], list[str]]:
- """Identify root directories and non-rooted files in a zip archive."""
- root_dirs: set[str] = set()
- non_rooted_files: list[str] = []
- for member in members:
- if "/" in member.name:
- root_dirs.add(member.name.split("/", 1)[0])
- elif not member.isdir():
- non_rooted_files.append(member.name)
- return root_dirs, non_rooted_files
-
-
-def _structure_check_core_logic_validate_root(
- members: list[str], root_dirs: set[str], non_rooted_files: list[str],
expected_roots: list[str]
-) -> tuple[str | None, str | None]:
- """Validate the identified root structure against expectations."""
- if non_rooted_files:
- return None, f"Files found directly in root: {non_rooted_files}"
if not root_dirs:
- return None, "No directories found in archive"
+ return {"error": "No directories found in archive", "expected_roots":
expected_roots}
if len(root_dirs) > 1:
- return None, f"Multiple root directories found:
{sorted(list(root_dirs))}"
-
- actual_root = next(iter(root_dirs))
- if actual_root not in expected_roots:
- expected_roots_display = "', '".join(expected_roots)
- return (
- actual_root,
- f"Root directory mismatch. Expected one of
'{expected_roots_display}', found '{actual_root}'",
- )
+ return {"error": f"Multiple root directories found: {root_dirs}",
"expected_roots": expected_roots}
+
+ actual_root = root_dirs[0]
+ if actual_root in expected_roots:
+ return {"root_dir": actual_root, "expected_roots": expected_roots}
- # Check whether all members are under the correct root directory
- expected_prefix = f"{actual_root.rstrip('/')}/"
- for member in members:
- if member == actual_root.rstrip("/"):
- continue
- if not member.startswith(expected_prefix):
- return None, f"Member found outside expected root directory:
{member}"
+ if (actual_root == "package") and (
+ npm_result := _structure_npm_result(cache_dir, basename_from_filename,
actual_root, expected_roots)
+ ):
+ return npm_result
+
+ expected_roots_display = "', '".join(expected_roots)
+ return {
+ "error": f"Root directory mismatch. Expected one of
'{expected_roots_display}', found '{actual_root}'",
+ "expected_roots": expected_roots,
+ }
+
+
+def _structure_npm_result(
+ cache_dir: pathlib.Path, basename_from_filename: str, actual_root: str,
expected_roots: list[str]
+) -> dict[str, Any] | None:
+ package_json_path = cache_dir / "package" / "package.json"
+ try:
+ package_json_stat = package_json_path.lstat()
+ except (FileNotFoundError, OSError):
+ return None
+
+ if not stat.S_ISREG(package_json_stat.st_mode):
+ return None
+
+ size = package_json_stat.st_size
+ if (size <= 0) or (size > util.NPM_PACKAGE_JSON_MAX_SIZE):
+ return None
+
+ package_json = package_json_path.read_bytes()
+ npm_info, _ = util.parse_npm_pack_info(package_json,
basename_from_filename)
+ if npm_info is None:
+ return None
- return actual_root, None
+ npm_data: dict[str, Any] = {
+ "root_dir": actual_root,
+ "expected_roots": expected_roots,
+ "npm_pack": {
+ "name": npm_info.name,
+ "version": npm_info.version,
+ "filename_match": npm_info.filename_match,
+ },
+ }
+ if npm_info.filename_match is False:
+ npm_data["error"] = "npm pack layout detected but filename does not
match package.json"
+ return npm_data
diff --git a/tests/unit/test_archive_member_limit.py
b/tests/unit/test_archive_member_limit.py
index b2ae672b..e22e50cb 100644
--- a/tests/unit/test_archive_member_limit.py
+++ b/tests/unit/test_archive_member_limit.py
@@ -157,21 +157,6 @@ def
test_zipformat_integrity_reports_member_limit(tmp_path, monkeypatch):
assert "too many members" in result.get("error", "").lower()
-def test_zipformat_structure_reports_member_limit(tmp_path, monkeypatch):
- archive_path = tmp_path / "sample.zip"
- _make_zip(archive_path, ["a.txt", "b.txt", "c.txt"])
-
- original_open = tarzip.open_archive
-
- def limited_open(path: str, *args, **kwargs):
- return original_open(path, max_members=2)
-
- monkeypatch.setattr(tarzip, "open_archive", limited_open)
-
- result = zipformat._structure_check_core_logic(str(archive_path))
- assert "too many members" in result.get("error", "").lower()
-
-
async def _args_for(recorder: recorders.RecorderStub) ->
checks.FunctionArguments:
return checks.FunctionArguments(
recorder=recorders.get_recorder(recorder),
diff --git a/tests/unit/test_archive_root_variants.py
b/tests/unit/test_archive_root_variants.py
index 7b7b56f2..07457a3d 100644
--- a/tests/unit/test_archive_root_variants.py
+++ b/tests/unit/test_archive_root_variants.py
@@ -18,7 +18,6 @@
import json
import pathlib
import unittest.mock as mock
-import zipfile
import pytest
@@ -158,6 +157,19 @@ async def
test_targz_structure_rejects_src_root_when_filename_has_source_suffix(
assert any(status == sql.CheckResultStatus.FAILURE.value for status, _, _
in recorder.messages)
[email protected]
+async def test_targz_structure_rejects_symlink_root(tmp_path: pathlib.Path) ->
None:
+ cache_dir = tmp_path / "cache"
+ cache_dir.mkdir()
+ (cache_dir / "apache-example-1.2.3").symlink_to(cache_dir /
"missing-target")
+ recorder, args = await _targz_structure_args(tmp_path,
"apache-example-1.2.3.tar.gz")
+
+ with mock.patch.object(checks, "resolve_cache_dir",
new=mock.AsyncMock(return_value=cache_dir)):
+ await targz.structure(args)
+
+ assert any(status == sql.CheckResultStatus.FAILURE.value for status, _, _
in recorder.messages)
+
+
@pytest.mark.asyncio
async def test_targz_structure_rejects_symlinked_package_json(tmp_path:
pathlib.Path) -> None:
cache_dir = _make_cache_tree_with_contents(
@@ -177,54 +189,48 @@ async def
test_targz_structure_rejects_symlinked_package_json(tmp_path: pathlib.
def test_zipformat_structure_accepts_npm_pack_root(tmp_path: pathlib.Path) ->
None:
- archive_path = tmp_path / "example-1.2.3.zip"
- _make_zip_with_contents(
- archive_path,
+ cache_dir = _make_cache_tree_with_contents(
+ tmp_path,
{
"package/package.json": json.dumps({"name": "example", "version":
"1.2.3"}),
"package/README.txt": "hello",
},
)
- result = zipformat._structure_check_core_logic(str(archive_path))
+ result = zipformat._structure_check_core_logic(cache_dir,
"example-1.2.3.zip")
assert result.get("error") is None
- assert result.get("warning") is None
assert result.get("root_dir") == "package"
def test_zipformat_structure_accepts_src_suffix_variant(tmp_path:
pathlib.Path) -> None:
- archive_path = tmp_path / "apache-example-1.2.3-src.zip"
- _make_zip(archive_path, ["apache-example-1.2.3/README.txt"])
+ cache_dir = _make_cache_tree(tmp_path, ["apache-example-1.2.3/README.txt"])
- result = zipformat._structure_check_core_logic(str(archive_path))
+ result = zipformat._structure_check_core_logic(cache_dir,
"apache-example-1.2.3-src.zip")
assert result.get("error") is None
- assert result.get("warning") is None
assert result.get("root_dir") == "apache-example-1.2.3"
def test_zipformat_structure_rejects_dated_src_suffix(tmp_path: pathlib.Path)
-> None:
- archive_path = tmp_path / "apache-example-1.2.3-src-20251202.zip"
- _make_zip(archive_path, ["apache-example-1.2.3/README.txt"])
+ cache_dir = _make_cache_tree(tmp_path, ["apache-example-1.2.3/README.txt"])
- result = zipformat._structure_check_core_logic(str(archive_path))
+ result = zipformat._structure_check_core_logic(cache_dir,
"apache-example-1.2.3-src-20251202.zip")
assert "error" in result
assert "Root directory mismatch" in result["error"]
def test_zipformat_structure_rejects_npm_pack_filename_mismatch(tmp_path:
pathlib.Path) -> None:
- archive_path = tmp_path / "example-1.2.3.zip"
- _make_zip_with_contents(
- archive_path,
+ cache_dir = _make_cache_tree_with_contents(
+ tmp_path,
{
"package/package.json": json.dumps({"name": "different",
"version": "1.2.3"}),
"package/README.txt": "hello",
},
)
- result = zipformat._structure_check_core_logic(str(archive_path))
+ result = zipformat._structure_check_core_logic(cache_dir,
"example-1.2.3.zip")
assert result.get("error") is not None
assert "npm pack layout detected" in result["error"]
@@ -232,20 +238,29 @@ def
test_zipformat_structure_rejects_npm_pack_filename_mismatch(tmp_path: pathli
def
test_zipformat_structure_rejects_package_root_without_package_json(tmp_path:
pathlib.Path) -> None:
- archive_path = tmp_path / "example-1.2.3.zip"
- _make_zip_with_contents(
- archive_path,
+ cache_dir = _make_cache_tree_with_contents(
+ tmp_path,
{
"package/README.txt": "hello",
},
)
- result = zipformat._structure_check_core_logic(str(archive_path))
+ result = zipformat._structure_check_core_logic(cache_dir,
"example-1.2.3.zip")
assert result.get("error") is not None
assert "Root directory mismatch" in result["error"]
+def test_zipformat_structure_rejects_top_level_symlink(tmp_path: pathlib.Path)
-> None:
+ cache_dir = _make_cache_tree(tmp_path, ["example-1.2.3/README.txt"])
+ (cache_dir / "link").symlink_to(cache_dir / "missing-target")
+
+ result = zipformat._structure_check_core_logic(cache_dir,
"example-1.2.3.zip")
+
+ assert result.get("error") is not None
+ assert "Files found directly in root" in result["error"]
+
+
def _make_cache_tree(base: pathlib.Path, members: list[str]) -> pathlib.Path:
"""Create a directory tree simulating the quarantine extraction cache."""
cache_dir = base / "cache"
@@ -268,18 +283,6 @@ def _make_cache_tree_with_contents(base: pathlib.Path,
members: dict[str, str])
return cache_dir
-def _make_zip(path: pathlib.Path, members: list[str]) -> None:
- with zipfile.ZipFile(path, "w") as zf:
- for name in members:
- zf.writestr(name, f"data-{name}")
-
-
-def _make_zip_with_contents(path: pathlib.Path, members: dict[str, str]) ->
None:
- with zipfile.ZipFile(path, "w") as zf:
- for name, content in members.items():
- zf.writestr(name, content)
-
-
async def _targz_structure_args(
tmp_path: pathlib.Path, archive_filename: str
) -> tuple[recorders.RecorderStub, checks.FunctionArguments]:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]