This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch sbp in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit ecd6fe6116b3118ca9521f4b2e9aadb04bf4dbb0 Author: Sean B. Palmer <[email protected]> AuthorDate: Fri Mar 6 17:05:00 2026 +0000 Extract quarantined archives to a staging directory first --- atr/tasks/quarantine.py | 19 ++++-- tests/unit/test_quarantine_task.py | 128 +++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 4 deletions(-) diff --git a/atr/tasks/quarantine.py b/atr/tasks/quarantine.py index c438907b..b0d16f23 100644 --- a/atr/tasks/quarantine.py +++ b/atr/tasks/quarantine.py @@ -19,7 +19,9 @@ from __future__ import annotations import asyncio import datetime +import errno import pathlib +import uuid import aiofiles.os import aioshutil @@ -100,7 +102,9 @@ async def _extract_archives_to_cache( ) -> None: conf = config.get() cache_base = paths.get_cache_archives_dir() / project_name / version_name + staging_base = paths.get_tmp_dir() await aiofiles.os.makedirs(cache_base, exist_ok=True) + await aiofiles.os.makedirs(staging_base, exist_ok=True) extraction_config = ( exarch.SecurityConfig() @@ -123,18 +127,25 @@ async def _extract_archives_to_cache( if await aiofiles.os.path.isdir(cache_dir): continue archive_path = str(quarantine_dir / archive.rel_path) - extract_dir = str(cache_dir) - await aiofiles.os.makedirs(extract_dir, exist_ok=True) + staging_dir = staging_base / f"archive-extract-{uuid.uuid4().hex}" + await aiofiles.os.makedirs(staging_dir, exist_ok=False) try: await asyncio.to_thread( exarch.extract_archive, archive_path, - extract_dir, + str(staging_dir), extraction_config, ) + try: + await aiofiles.os.rename(staging_dir, cache_dir) + except OSError as err: + if isinstance(err, FileExistsError) or err.errno in {errno.EEXIST, errno.ENOTEMPTY}: + await aioshutil.rmtree(staging_dir, ignore_errors=True) + else: + raise except Exception: log.exception(f"Failed to extract archive {archive.rel_path} to cache") - await aioshutil.rmtree(cache_dir, ignore_errors=True) + await aioshutil.rmtree(staging_dir, ignore_errors=True) raise diff --git a/tests/unit/test_quarantine_task.py b/tests/unit/test_quarantine_task.py index 0ea42b54..08ad6326 100644 --- a/tests/unit/test_quarantine_task.py +++ b/tests/unit/test_quarantine_task.py @@ -15,11 +15,13 @@ # specific language governing permissions and limitations # under the License. +import errno import io import pathlib import tarfile import unittest.mock as mock +import aiofiles import pytest import atr.models.safe as safe @@ -28,6 +30,132 @@ import atr.tasks as tasks import atr.tasks.quarantine as quarantine [email protected] +async def test_extract_archives_to_cache_discards_staging_dir_when_other_worker_wins( + monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path +) -> None: + quarantine_dir = tmp_path / "quarantine" + quarantine_dir.mkdir() + archive_rel_path = "artifact.tar.gz" + (quarantine_dir / archive_rel_path).write_bytes(b"archive") + cache_root = tmp_path / "cache" + tmp_root = tmp_path / "temporary" + recorded: dict[str, pathlib.Path] = {} + + def extract_archive(_archive_path: str, extract_dir: str, _config: object) -> None: + staging_dir = pathlib.Path(extract_dir) + recorded["staging_dir"] = staging_dir + (staging_dir / "content.txt").write_text("staged") + + async def rename(src: pathlib.Path | str, dst: pathlib.Path | str) -> None: + dst_path = pathlib.Path(dst) + await aiofiles.os.makedirs(dst_path, exist_ok=True) + async with aiofiles.open(dst_path / "winner.txt", "w") as f: + await f.write("winner") + raise FileExistsError(dst) + + monkeypatch.setattr(quarantine.paths, "get_cache_archives_dir", lambda: cache_root) + monkeypatch.setattr(quarantine.paths, "get_tmp_dir", lambda: tmp_root) + monkeypatch.setattr(quarantine.exarch, "extract_archive", extract_archive) + monkeypatch.setattr(quarantine.aiofiles.os, "rename", rename) + + await quarantine._extract_archives_to_cache( + [quarantine.QuarantineArchiveEntry(rel_path=archive_rel_path, content_hash="blake3:def")], + quarantine_dir, + "proj", + "1.0", + ) + + cache_dir = cache_root / "proj" / "1.0" / quarantine.hashes.filesystem_cache_archives_key("blake3:def") + + assert cache_dir.is_dir() + assert (cache_dir / "winner.txt").read_text() == "winner" + assert not recorded["staging_dir"].exists() + + [email protected] +async def test_extract_archives_to_cache_discards_staging_dir_on_enotempty_collision( + monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path +) -> None: + quarantine_dir = tmp_path / "quarantine" + quarantine_dir.mkdir() + archive_rel_path = "artifact.tar.gz" + (quarantine_dir / archive_rel_path).write_bytes(b"archive") + cache_root = tmp_path / "cache" + tmp_root = tmp_path / "temporary" + recorded: dict[str, pathlib.Path] = {} + + def extract_archive(_archive_path: str, extract_dir: str, _config: object) -> None: + staging_dir = pathlib.Path(extract_dir) + recorded["staging_dir"] = staging_dir + (staging_dir / "content.txt").write_text("staged") + + async def rename(src: pathlib.Path | str, dst: pathlib.Path | str) -> None: + dst_path = pathlib.Path(dst) + await aiofiles.os.makedirs(dst_path, exist_ok=True) + async with aiofiles.open(dst_path / "winner.txt", "w") as f: + await f.write("winner") + raise OSError(errno.ENOTEMPTY, "Directory not empty", str(dst_path)) + + monkeypatch.setattr(quarantine.paths, "get_cache_archives_dir", lambda: cache_root) + monkeypatch.setattr(quarantine.paths, "get_tmp_dir", lambda: tmp_root) + monkeypatch.setattr(quarantine.exarch, "extract_archive", extract_archive) + monkeypatch.setattr(quarantine.aiofiles.os, "rename", rename) + + await quarantine._extract_archives_to_cache( + [quarantine.QuarantineArchiveEntry(rel_path=archive_rel_path, content_hash="blake3:ghi")], + quarantine_dir, + "proj", + "1.0", + ) + + cache_dir = cache_root / "proj" / "1.0" / quarantine.hashes.filesystem_cache_archives_key("blake3:ghi") + + assert cache_dir.is_dir() + assert (cache_dir / "winner.txt").read_text() == "winner" + assert not recorded["staging_dir"].exists() + + [email protected] +async def test_extract_archives_to_cache_stages_in_temporary_then_promotes( + monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path +) -> None: + quarantine_dir = tmp_path / "quarantine" + quarantine_dir.mkdir() + archive_rel_path = "artifact.tar.gz" + (quarantine_dir / archive_rel_path).write_bytes(b"archive") + cache_root = tmp_path / "cache" + tmp_root = tmp_path / "temporary" + recorded: dict[str, str] = {} + + def extract_archive(archive_path: str, extract_dir: str, _config: object) -> None: + recorded["archive_path"] = archive_path + recorded["extract_dir"] = extract_dir + extract_path = pathlib.Path(extract_dir) + (extract_path / "content.txt").write_text("cached") + + monkeypatch.setattr(quarantine.paths, "get_cache_archives_dir", lambda: cache_root) + monkeypatch.setattr(quarantine.paths, "get_tmp_dir", lambda: tmp_root) + monkeypatch.setattr(quarantine.exarch, "extract_archive", extract_archive) + + await quarantine._extract_archives_to_cache( + [quarantine.QuarantineArchiveEntry(rel_path=archive_rel_path, content_hash="blake3:abc")], + quarantine_dir, + "proj", + "1.0", + ) + + cache_dir = cache_root / "proj" / "1.0" / quarantine.hashes.filesystem_cache_archives_key("blake3:abc") + staging_base = tmp_root + + assert recorded["archive_path"] == str(quarantine_dir / archive_rel_path) + assert pathlib.Path(recorded["extract_dir"]).parent == staging_base + assert pathlib.Path(recorded["extract_dir"]) != cache_dir + assert cache_dir.is_dir() + assert (cache_dir / "content.txt").read_text() == "cached" + assert list(staging_base.iterdir()) == [] + + @pytest.mark.asyncio async def test_mark_failed_persists_on_managed_instance(): # This is a regression test for a bug during development --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
