This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-atr-experiments.git
The following commit(s) were added to refs/heads/main by this push: new d1be8e9 Add aiofiles and use it to refactor file handling d1be8e9 is described below commit d1be8e9e5e153205a452e0dfa211298c3c738668 Author: Sean B. Palmer <s...@miscoranda.com> AuthorDate: Fri Feb 14 20:21:25 2025 +0200 Add aiofiles and use it to refactor file handling --- atr/routes.py | 369 ++++++++++++++++++++++++++++++++++----------------------- poetry.lock | 14 ++- pyproject.toml | 4 +- uv.lock | 13 ++ 4 files changed, 250 insertions(+), 150 deletions(-) diff --git a/atr/routes.py b/atr/routes.py index b7f0421..bfbd3c1 100644 --- a/atr/routes.py +++ b/atr/routes.py @@ -17,27 +17,35 @@ "routes.py" +import asyncio +import datetime import hashlib -from io import BufferedReader import json import pprint +import secrets +import shutil +import tempfile + +from contextlib import asynccontextmanager +from io import BufferedReader from pathlib import Path -from typing import List, Tuple, Optional, Dict, Any, cast -import datetime -import asyncio +from typing import Any, Dict, List, Optional, Tuple, cast + +import aiofiles +import aiofiles.os +import gnupg +import httpx from asfquart import APP from asfquart.auth import Requirements as R, require from asfquart.base import ASFQuartException from asfquart.session import read as session_read, ClientSession -from quart import current_app, render_template, request +from quart import current_app, render_template, request, Request from sqlmodel import select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.attributes import InstrumentedAttribute -import httpx -import gnupg from sqlalchemy.orm import selectinload +from sqlalchemy.orm.attributes import InstrumentedAttribute +from werkzeug.datastructures import FileStorage from .models import ( DistributionChannel, @@ -58,6 +66,97 @@ if APP is ...: ALLOWED_USERS = {"cwells", "fluxo", "gmcdonald", "humbedooh", "sbp", "tn", "wave"} +async def add_release_candidate_post(session: ClientSession, request: Request) -> str: + form = await request.form + + project_name = form.get("project_name") + if not project_name: + raise ASFQuartException("Project name is required", errorcode=400) + + # Verify user is a PMC member of the project + if project_name not in session.committees: + raise ASFQuartException( + f"You must be a PMC member of {project_name} to submit a release candidate", errorcode=403 + ) + + # Get all uploaded files + files = await request.files + + # Get the release artifact and signature files + artifact_file = files.get("release_artifact") + signature_file = files.get("release_signature") + + if not artifact_file: + raise ASFQuartException("Release artifact file is required", errorcode=400) + if not signature_file: + raise ASFQuartException("Detached GPG signature file is required", errorcode=400) + if not signature_file.filename.endswith(".asc"): + # TODO: Could also check that it's artifact name + ".asc" + # And at least warn if it's not + raise ASFQuartException("Signature file must have .asc extension", errorcode=400) + + # Save files using their hashes as filenames + uploads_path = Path(current_app.config["RELEASE_STORAGE_DIR"]) + artifact_hash = await save_file_by_hash(uploads_path, artifact_file) + # TODO: Do we need to do anything with the signature hash? + # These should be identical, but path might be absolute? + # TODO: Need to check, ideally. Could have a data browser + signature_hash = await save_file_by_hash(uploads_path, signature_file) + + # Generate a 128-bit random token for the release storage key + storage_token = secrets.token_hex(16) + + # Compute SHA-512 checksum of the artifact for the package record + checksum_512 = compute_sha512(uploads_path / artifact_hash) + + # Store in database + async_session = current_app.config["async_session"] + async with async_session() as db_session: + async with db_session.begin(): + # Get PMC + statement = select(PMC).where(PMC.project_name == project_name) + pmc = (await db_session.execute(statement)).scalar_one_or_none() + if not pmc: + raise ASFQuartException("PMC not found", errorcode=404) + + # Create release record using random token as storage key + # TODO: Extract version from filename or add to form + release = Release( + storage_key=storage_token, + stage=ReleaseStage.CANDIDATE, + phase=ReleasePhase.RELEASE_CANDIDATE, + pmc_id=pmc.id, + version="TODO", + ) + db_session.add(release) + + # Create package record + package = Package( + file=artifact_hash, + signature=signature_hash, + checksum=checksum_512, + release_key=release.storage_key, + ) + db_session.add(package) + + return f"Successfully uploaded release candidate for {project_name}" + + +@asynccontextmanager +async def ephemeral_gpg_home(): + """ + Create a temporary directory for an isolated GnuPG home, and clean it up on exit. + This is done asynchronously to avoid blocking the event loop. + """ + # Create a temporary directory off-thread. + temp_dir = await asyncio.to_thread(tempfile.mkdtemp, prefix="gnupg-") + try: + yield temp_dir + finally: + # Remove the directory off-thread as well. + await asyncio.to_thread(shutil.rmtree, temp_dir) + + def compute_sha3_256(file_data: bytes) -> str: "Compute SHA3-256 hash of file data." return hashlib.sha3_256(file_data).hexdigest() @@ -99,79 +198,7 @@ async def root_add_release_candidate() -> str: # For POST requests, handle the file upload if request.method == "POST": - form = await request.form - - project_name = form.get("project_name") - if not project_name: - raise ASFQuartException("Project name is required", errorcode=400) - - # Verify user is a PMC member of the project - if project_name not in session.committees: - raise ASFQuartException( - f"You must be a PMC member of {project_name} to submit a release candidate", errorcode=403 - ) - - # Get all uploaded files - files = await request.files - - # Get the release artifact and signature files - artifact_file = files.get("release_artifact") - signature_file = files.get("release_signature") - - if not artifact_file: - raise ASFQuartException("Release artifact file is required", errorcode=400) - if not signature_file: - raise ASFQuartException("Detached GPG signature file is required", errorcode=400) - if not signature_file.filename.endswith(".asc"): - # TODO: Could also check that it's artifact name + ".asc" - # And at least warn if it's not - raise ASFQuartException("Signature file must have .asc extension", errorcode=400) - - # Save files using their hashes as filenames - storage_dir = Path(current_app.config["RELEASE_STORAGE_DIR"]) / project_name - artifact_path, artifact_hash = await save_file_by_hash(artifact_file, storage_dir) - # TODO: Do we need to do anything with the signature hash? - # These should be identical, but path might be absolute? - # TODO: Need to check, ideally. Could have a data browser - signature_path, _ = await save_file_by_hash(signature_file, storage_dir) - - # Compute SHA-512 checksum of the artifact for the package record - # We're using SHA-3-256 for the filename, so we need to use SHA-3-512 for the checksum - checksum_512 = compute_sha512(artifact_path) - - # Store in database - async_session = current_app.config["async_session"] - async with async_session() as db_session: - async with db_session.begin(): - # Get PMC - statement = select(PMC).where(PMC.project_name == project_name) - pmc = (await db_session.execute(statement)).scalar_one_or_none() - if not pmc: - raise ASFQuartException("PMC not found", errorcode=404) - - # Create release record using artifact hash as storage key - # At some point this presumably won't work, because we can have many artifacts - # But meanwhile it's fine - # TODO: Extract version from filename or add to form - release = Release( - storage_key=artifact_hash, - stage=ReleaseStage.CANDIDATE, - phase=ReleasePhase.RELEASE_CANDIDATE, - pmc_id=pmc.id, - version="", - ) - db_session.add(release) - - # Create package record - package = Package( - file=str(artifact_path.relative_to(current_app.config["RELEASE_STORAGE_DIR"])), - signature=str(signature_path.relative_to(current_app.config["RELEASE_STORAGE_DIR"])), - checksum=checksum_512, - release_key=release.storage_key, - ) - db_session.add(package) - - return f"Successfully uploaded release candidate for {project_name}" + return await add_release_candidate_post(session, request) # For GET requests, show the form return await render_template( @@ -278,9 +305,9 @@ async def root_admin_update_pmcs() -> str: # Update PMC data roster = info.get("roster", {}) - # All roster members are PMC members + # TODO: Here we say that roster == pmc_members == committers + # We ought to do this more accurately instead pmc.pmc_members = list(roster.keys()) - # All PMC members are also committers pmc.committers = list(roster.keys()) # Mark chairs as release managers @@ -331,14 +358,29 @@ async def root_release_signatures_verify(release_key: str) -> str: async_session = current_app.config["async_session"] async with async_session() as db_session: - # Get the release and its packages + # Get the release and its packages, and PMC with its keys release_packages = selectinload(cast(InstrumentedAttribute[List[Package]], Release.packages)) release_pmc = selectinload(cast(InstrumentedAttribute[PMC], Release.pmc)) - statement = select(Release).options(release_packages, release_pmc).where(Release.storage_key == release_key) + pmc_keys_loader = selectinload(cast(InstrumentedAttribute[PMC], Release.pmc)).selectinload( + cast(InstrumentedAttribute[List[PublicSigningKey]], PMC.public_signing_keys) + ) + + # For now, for debugging, we'll just get all keys in the database + statement = select(PublicSigningKey) + all_public_keys = (await db_session.execute(statement)).scalars().all() + + statement = ( + select(Release) + .options(release_packages, release_pmc, pmc_keys_loader) + .where(Release.storage_key == release_key) + ) release = (await db_session.execute(statement)).scalar_one_or_none() if not release: raise ASFQuartException("Release not found", errorcode=404) + # Get all public keys associated with the PMC + pmc_keys = [key.public_key for key in all_public_keys] + # Verify each package's signature verification_results = [] storage_dir = Path(current_app.config["RELEASE_STORAGE_DIR"]) @@ -355,7 +397,7 @@ async def root_release_signatures_verify(release_key: str) -> str: result["error"] = "Package signature file not found" else: # Verify the signature - result = await verify_gpg_signature(artifact_path, signature_path) + result = await verify_gpg_signature(artifact_path, signature_path, pmc_keys) result["file"] = package.file verification_results.append(result) @@ -391,35 +433,35 @@ async def root_pmc_arg(project_name: str) -> dict: } -@APP.route("/pmc/create/<project_name>") -async def root_pmc_create_arg(project_name: str) -> dict: - "Create a new PMC with some sample data." - pmc = PMC( - project_name=project_name, - pmc_members=["alice", "bob"], - committers=["charlie", "dave"], - release_managers=["alice"], - ) - - async_session = current_app.config["async_session"] - async with async_session() as db_session: - async with db_session.begin(): - try: - db_session.add(pmc) - except IntegrityError: - raise ASFQuartException( - f"PMC with name '{project_name}' already exists", - errorcode=409, # HTTP 409 Conflict - ) - - # Convert to dict for response - return { - "id": pmc.id, - "project_name": pmc.project_name, - "pmc_members": pmc.pmc_members, - "committers": pmc.committers, - "release_managers": pmc.release_managers, - } +# @APP.route("/pmc/create/<project_name>") +# async def root_pmc_create_arg(project_name: str) -> dict: +# "Create a new PMC with some sample data." +# pmc = PMC( +# project_name=project_name, +# pmc_members=["alice", "bob"], +# committers=["charlie", "dave"], +# release_managers=["alice"], +# ) + +# async_session = current_app.config["async_session"] +# async with async_session() as db_session: +# async with db_session.begin(): +# try: +# db_session.add(pmc) +# except IntegrityError: +# raise ASFQuartException( +# f"PMC with name '{project_name}' already exists", +# errorcode=409, # HTTP 409 Conflict +# ) + +# # Convert to dict for response +# return { +# "id": pmc.id, +# "project_name": pmc.project_name, +# "pmc_members": pmc.pmc_members, +# "committers": pmc.committers, +# "release_managers": pmc.release_managers, +# } @APP.route("/pmc/directory") @@ -554,25 +596,42 @@ async def root_user_uploads() -> str: return await render_template("user-uploads.html", releases=user_releases) -async def save_file_by_hash(file, base_dir: Path) -> Tuple[Path, str]: +async def save_file_by_hash(base_dir: Path, file: FileStorage) -> str: """ Save a file using its SHA3-256 hash as the filename. Returns the path where the file was saved and its hash. """ - # FileStorage.read() returns bytes directly, no need to await - data = file.read() - file_hash = compute_sha3_256(data) - - # Create path with hash as filename - path = base_dir / file_hash - path.parent.mkdir(parents=True, exist_ok=True) + sha3 = hashlib.sha3_256() - # Only write if file doesn't exist - # If it does exist, it'll be the same content anyway - if not path.exists(): - path.write_bytes(data) - - return path, file_hash + # Create temporary file to stream to while computing hash + temp_path = base_dir / f"temp-{secrets.token_hex(8)}" + try: + stream = file.stream + + async with aiofiles.open(temp_path, "wb") as f: + while True: + chunk = await asyncio.to_thread(stream.read, 8192) + if not chunk: + break + sha3.update(chunk) + await f.write(chunk) + + file_hash = sha3.hexdigest() + final_path = base_dir / file_hash + + # Only move to final location if it doesn't exist + # This can race, but it's hash based so it's okay + if not await aiofiles.os.path.exists(final_path): + await aiofiles.os.rename(temp_path, final_path) + else: + # If file already exists, just remove the temp file + await aiofiles.os.remove(temp_path) + + return file_hash + except Exception as e: + if await aiofiles.os.path.exists(temp_path): + await aiofiles.os.remove(temp_path) + raise e async def user_keys_add(session: ClientSession, public_key: str) -> Tuple[str, Optional[dict]]: @@ -581,17 +640,19 @@ async def user_keys_add(session: ClientSession, public_key: str) -> Tuple[str, O # Import the key into GPG to validate and extract info # TODO: We'll just assume for now that gnupg.GPG() doesn't need to be async - gpg = gnupg.GPG() - import_result = await asyncio.to_thread(gpg.import_keys, public_key) + async with ephemeral_gpg_home() as gpg_home: + gpg = gnupg.GPG(gnupghome=gpg_home) + import_result = await asyncio.to_thread(gpg.import_keys, public_key) + + if not import_result.fingerprints: + return ("Invalid public key format", None) - if not import_result.fingerprints: - return ("Invalid public key format", None) + fingerprint = import_result.fingerprints[0] + # Get key details + # We could probably use import_result instead + # But this way it shows that they've really been imported + keys = await asyncio.to_thread(gpg.list_keys) - fingerprint = import_result.fingerprints[0] - # Get key details - # We could probably use import_result instead - # But this way it shows that they've really been imported - keys = await asyncio.to_thread(gpg.list_keys) # Then we have the properties listed here: # https://gnupg.readthedocs.io/en/latest/#listing-keys # Note that "fingerprint" is not listed there, but we have it anyway... @@ -614,11 +675,11 @@ async def user_keys_add_session( ) -> Tuple[str, Optional[dict]]: # Check if key already exists statement = select(PublicSigningKey).where(PublicSigningKey.user_id == session.uid) - existing_key = (await db_session.execute(statement)).scalar_one_or_none() - if existing_key: - # TODO: We should allow more than one key per user - return ("You already have a key registered", None) + # # If uncommented, this will prevent a user from adding a second key + # existing_key = (await db_session.execute(statement)).scalar_one_or_none() + # if existing_key: + # return ("You already have a key registered", None) if not session.uid: return ("You must be signed in to add a key", None) @@ -659,15 +720,14 @@ async def user_keys_add_session( ) -async def verify_gpg_signature(artifact_path: Path, signature_path: Path) -> Dict[str, Any]: +async def verify_gpg_signature(artifact_path: Path, signature_path: Path, public_keys: List[str]) -> Dict[str, Any]: """ Verify a GPG signature for a release artifact. Returns a dictionary with verification results and debug information. """ - gpg = gnupg.GPG() try: with open(signature_path, "rb") as sig_file: - return await verify_gpg_signature_file(gpg, sig_file, artifact_path) + return await verify_gpg_signature_file(sig_file, artifact_path, public_keys) except Exception as e: return { "verified": False, @@ -677,9 +737,21 @@ async def verify_gpg_signature(artifact_path: Path, signature_path: Path) -> Dic } -async def verify_gpg_signature_file(gpg: gnupg.GPG, sig_file: BufferedReader, artifact_path: Path) -> Dict[str, Any]: +async def verify_gpg_signature_file( + sig_file: BufferedReader, artifact_path: Path, public_keys: List[str] +) -> Dict[str, Any]: # Run the blocking GPG verification in a thread - verified = await asyncio.to_thread(gpg.verify_file, sig_file, str(artifact_path)) + async with ephemeral_gpg_home() as gpg_home: + gpg = gnupg.GPG(gnupghome=gpg_home) + + # Import all PMC public keys + for key in public_keys: + import_result = await asyncio.to_thread(gpg.import_keys, key) + if not import_result.fingerprints: + # TODO: Log warning about invalid key? + continue + + verified = await asyncio.to_thread(gpg.verify_file, sig_file, str(artifact_path)) # Collect all available information for debugging debug_info = { @@ -694,6 +766,7 @@ async def verify_gpg_signature_file(gpg: gnupg.GPG, sig_file: BufferedReader, ar "trust_level": verified.trust_level if hasattr(verified, "trust_level") else "Not available", "trust_text": verified.trust_text if hasattr(verified, "trust_text") else "Not available", "stderr": verified.stderr if hasattr(verified, "stderr") else "Not available", + "num_pmc_keys": len(public_keys), } if not verified: diff --git a/poetry.lock b/poetry.lock index 587998b..4883676 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2343,6 +2343,18 @@ notebook = ["ipywidgets (>=6)"] slack = ["slack-sdk"] telegram = ["requests"] +[[package]] +name = "types-aiofiles" +version = "24.1.0.20241221" +description = "Typing stubs for aiofiles" +optional = false +python-versions = ">=3.8" +groups = ["dev"] +files = [ + {file = "types_aiofiles-24.1.0.20241221-py3-none-any.whl", hash = "sha256:11d4e102af0627c02e8c1d17736caa3c39de1058bea37e2f4de6ef11a5b652ab"}, + {file = "types_aiofiles-24.1.0.20241221.tar.gz", hash = "sha256:c40f6c290b0af9e902f7f3fa91213cf5bb67f37086fb21dc0ff458253586ad55"}, +] + [[package]] name = "typing-extensions" version = "4.12.2" @@ -2623,4 +2635,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.1" python-versions = "~=3.13" -content-hash = "5baaf15a0054517a86b73b73523916b19abb26f4368b49396673d991c1de370b" +content-hash = "83ebf1f0e0465eb2fdd013705e58ed06d047baa560bdfd0196d327ba015f30d1" diff --git a/pyproject.toml b/pyproject.toml index 3478d5a..5b17146 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ license = "Apache-2.0" readme = "README.md" requires-python = "~=3.13" dependencies = [ + "aiofiles>=24.1.0,<25.0.0", "aiosqlite>=0.21.0,<0.22.0", "alembic~=1.14", "asfquart", # TODO: convert asfquart from a source dependency to pypi or git dependency @@ -27,6 +28,7 @@ dev = [ "pre-commit>=2.20.0", "pyright>=1.1.393", "ruff>=0.9.4", + "types-aiofiles>=24.1.0.20241221,<25.0.0.0", ] # Other configuration @@ -50,7 +52,7 @@ mypy = "^1.15.0" pre-commit = ">=2.20.0" pyright = ">=1.1.393" ruff = ">=0.9.4" - +types-aiofiles = ">=24.1.0.20241221,<25.0.0.0" # UV specific configuration [tool.uv.sources] diff --git a/uv.lock b/uv.lock index c970bec..38f9404 100644 --- a/uv.lock +++ b/uv.lock @@ -1099,6 +1099,7 @@ name = "tooling-atr-experiment" version = "0.0.1" source = { virtual = "." } dependencies = [ + { name = "aiofiles" }, { name = "aiosqlite" }, { name = "alembic" }, { name = "asfquart" }, @@ -1117,10 +1118,12 @@ dev = [ { name = "pre-commit" }, { name = "pyright" }, { name = "ruff" }, + { name = "types-aiofiles" }, ] [package.metadata] requires-dist = [ + { name = "aiofiles", specifier = ">=24.1.0,<25.0.0" }, { name = "aiosqlite", specifier = ">=0.21.0,<0.22.0" }, { name = "alembic", specifier = "~=1.14" }, { name = "asfquart", editable = "asfquart" }, @@ -1139,6 +1142,7 @@ dev = [ { name = "pre-commit", specifier = ">=2.20.0" }, { name = "pyright", specifier = ">=1.1.393" }, { name = "ruff", specifier = ">=0.9.4" }, + { name = "types-aiofiles", specifier = ">=24.1.0.20241221,<25.0.0.0" }, ] [[package]] @@ -1153,6 +1157,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl", hash = "sha256:26445eca388f82e72884e0d580d5464cd801a3ea01e63e5601bdff9ba6a48de2", size = 78540 }, ] +[[package]] +name = "types-aiofiles" +version = "24.1.0.20241221" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ab/5e/f984b9ddc7eecdf31e683e692d933f3672276ed95aad6adb9aea9ecbdc29/types_aiofiles-24.1.0.20241221.tar.gz", hash = "sha256:c40f6c290b0af9e902f7f3fa91213cf5bb67f37086fb21dc0ff458253586ad55", size = 14081 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ff/da/77902220df98ce920444cf3611fa0b1cf0dc2cfa5a137c55e93829aa458e/types_aiofiles-24.1.0.20241221-py3-none-any.whl", hash = "sha256:11d4e102af0627c02e8c1d17736caa3c39de1058bea37e2f4de6ef11a5b652ab", size = 14162 }, +] + [[package]] name = "typing-extensions" version = "4.12.2" --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tooling.apache.org For additional commands, e-mail: dev-h...@tooling.apache.org