This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 4682d79514ad4d6b0db801d048ceec14a63319e0 Author: Alastair McFarlane <[email protected]> AuthorDate: Tue Jan 13 15:38:23 2026 +0000 Rename GHA task to be more specific, and work in extra required parameters. UI work to trigger distributions and endpoint to register an SSH key. Include workflow status recording. --- atr/api/__init__.py | 52 ++++++++- atr/db/__init__.py | 20 ++++ atr/db/interaction.py | 51 +++++++-- atr/form.py | 4 + atr/get/distribution.py | 85 +++++++++++--- atr/get/finish.py | 140 ++++++++++++++++++----- atr/models/api.py | 31 +++++ atr/models/results.py | 6 +- atr/models/sql.py | 23 +++- atr/post/distribution.py | 103 +++++++++++++++-- atr/shared/distribution.py | 13 +-- atr/storage/__init__.py | 1 + atr/storage/writers/__init__.py | 2 + atr/storage/writers/distributions.py | 46 +++++++- atr/storage/writers/workflowstatus.py | 134 ++++++++++++++++++++++ atr/tasks/__init__.py | 2 +- atr/tasks/gha.py | 146 ++++++++++++++++-------- atr/templates/check-selected.html | 12 +- atr/worker.py | 5 +- migrations/versions/0037_2026.01.13_0cefcaea.py | 46 ++++++++ 20 files changed, 789 insertions(+), 133 deletions(-) diff --git a/atr/api/__init__.py b/atr/api/__init__.py index 06c932e..ee94a8e 100644 --- a/atr/api/__init__.py +++ b/atr/api/__init__.py @@ -251,6 +251,54 @@ async def committees_list() -> DictResponse: ).model_dump(), 200 [email protected]("/distribute/ssh/register", methods=["POST"]) +@quart_schema.validate_request(models.api.DistributeSshRegisterArgs) +async def distribute_ssh_register(data: models.api.DistributeSshRegisterArgs) -> DictResponse: + """ + Register an SSH key sent with a corroborating Trusted Publisher JWT, + validating the requested version is in the correct phase. + """ + payload, asf_uid, project = await interaction.trusted_jwt_for_version( + data.publisher, data.jwt, interaction.TrustedProjectPhase(data.phase), data.version + ) + async with storage.write_as_committee_member(util.unwrap(project.committee).name, asf_uid) as wacm: + fingerprint, expires = await wacm.ssh.add_workflow_key( + payload["actor"], + payload["actor_id"], + project.name, + data.ssh_key, + ) + + return models.api.DistributeSshRegisterResults( + endpoint="/distribute/ssh/register", + fingerprint=fingerprint, + project=project.name, + expires=expires, + ).model_dump(), 200 + + [email protected]("/distribute/task/status", methods=["POST"]) +@quart_schema.validate_request(models.api.DistributeStatusUpdateArgs) +async def update_distribution_task_status(data: models.api.DistributeStatusUpdateArgs) -> DictResponse: + """ + Update the status of a distribution task + """ + _payload, _asf_uid = await interaction.validate_trusted_jwt(data.publisher, data.jwt) + async with db.session() as db_data: + status = await db_data.workflow_status( + workflow_id=data.workflow, + project_name=data.project_name, + run_id=data.run_id, + ).demand(exceptions.NotFound(f"Workflow {data.workflow} not found")) + status.status = data.status + status.message = data.message + await db_data.commit() + return models.api.DistributeStatusUpdateResults( + endpoint="/distribute/task/status", + success=True, + ).model_dump(), 200 + + @api.route("/distribution/record", methods=["POST"]) @jwtoken.require @quart_schema.security_scheme([{"BearerAuth": []}]) @@ -279,7 +327,7 @@ async def distribution_record(data: models.api.DistributionRecordArgs) -> DictRe async with storage.write(asf_uid) as write: wacm = write.as_committee_member(release.committee.name) await wacm.distributions.record_from_data( - release, + release.name, data.staging, dd, ) @@ -656,7 +704,7 @@ async def publisher_distribution_record(data: models.api.PublisherDistributionRe async with storage.write(asf_uid) as write: wacm = write.as_committee_member(release.committee.name) await wacm.distributions.record_from_data( - release, + release.name, data.staging, dd, ) diff --git a/atr/db/__init__.py b/atr/db/__init__.py index 6d6e3ec..3f00a32 100644 --- a/atr/db/__init__.py +++ b/atr/db/__init__.py @@ -731,6 +731,26 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession): return Query(self, query) + def workflow_status( + self, + workflow_id: Opt[str] = NOT_SET, + run_id: Opt[int] = NOT_SET, + project_name: Opt[str] = NOT_SET, + task_id: Opt[int] = NOT_SET, + ) -> Query[sql.WorkflowStatus]: + query = sqlmodel.select(sql.WorkflowStatus) + + if is_defined(workflow_id): + query = query.where(sql.WorkflowStatus.workflow_id == workflow_id) + if is_defined(run_id): + query = query.where(sql.WorkflowStatus.run_id == run_id) + if is_defined(project_name): + query = query.where(sql.WorkflowStatus.project_name == project_name) + if is_defined(task_id): + query = query.where(sql.WorkflowStatus.task_id == task_id) + + return Query(self, query) + async def create_async_engine(app_config: type[config.AppConfig]) -> sqlalchemy.ext.asyncio.AsyncEngine: absolute_db_path = os.path.join(app_config.STATE_DIR, app_config.SQLITE_DB_PATH) diff --git a/atr/db/interaction.py b/atr/db/interaction.py index f8026f8..46c272c 100644 --- a/atr/db/interaction.py +++ b/atr/db/interaction.py @@ -19,7 +19,7 @@ import contextlib import datetime import enum from collections.abc import AsyncGenerator, Sequence -from typing import Any +from typing import Any, Final import packaging.version as version import sqlalchemy @@ -36,6 +36,8 @@ import atr.user as user import atr.util as util import atr.web as web +_GITHUB_TRUSTED_ROLE_NID: Final[int] = 254436773 + class ApacheUserMissingError(RuntimeError): def __init__(self, message: str, fingerprint: str | None, primary_uid: str | None) -> None: @@ -170,15 +172,6 @@ async def full_releases(project: sql.Project) -> list[sql.Release]: return await releases_by_phase(project, sql.ReleasePhase.RELEASE) -async def trusted_jwt(publisher: str, jwt: str, phase: TrustedProjectPhase) -> tuple[dict[str, Any], str, sql.Project]: - if publisher != "github": - raise InteractionError(f"Publisher {publisher} not supported") - payload = await jwtoken.verify_github_oidc(jwt) - asf_uid = await ldap.github_to_apache(payload["actor_id"]) - project = await _trusted_project(payload["repository"], payload["workflow_ref"], phase) - return payload, asf_uid, project - - async def has_failing_checks(release: sql.Release, revision_number: str, caller_data: db.Session | None = None) -> bool: async with db.ensure_session(caller_data) as data: query = ( @@ -395,6 +388,33 @@ async def tasks_ongoing_revision( return task_count, latest_revision +async def trusted_jwt(publisher: str, jwt: str, phase: TrustedProjectPhase) -> tuple[dict[str, Any], str, sql.Project]: + payload, asf_uid = await validate_trusted_jwt(publisher, jwt) + # JWT could be for an ASF user or the trusted role, but we need a user here. + if asf_uid is None: + raise InteractionError("ASF user not found") + project = await _trusted_project(payload["repository"], payload["workflow_ref"], phase) + return payload, asf_uid, project + + +async def trusted_jwt_for_version( + publisher: str, jwt: str, phase: TrustedProjectPhase, version_name: str +) -> tuple[dict[str, Any], str, sql.Project]: + payload, asf_uid, project = await trusted_jwt(publisher, jwt, phase) + async with db.session() as db_data: + release = await db_data.release(project_name=project.name, version=version_name).get() + if not release: + raise InteractionError(f"Release {version_name} does not exist in project {project.name}") + if phase == TrustedProjectPhase.COMPOSE and release.phase != sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT: + raise InteractionError(f"Release {version_name} is not in compose phase") + if phase == TrustedProjectPhase.VOTE and release.phase != sql.ReleasePhase.RELEASE_CANDIDATE: + raise InteractionError(f"Release {version_name} is not in vote phase") + if phase == TrustedProjectPhase.FINISH and release.phase != sql.ReleasePhase.RELEASE_PREVIEW: + raise InteractionError(f"Release {version_name} is not in finish phase") + + return payload, asf_uid, project + + async def unfinished_releases(asfuid: str) -> list[tuple[str, str, list[sql.Release]]]: releases: list[tuple[str, str, list[sql.Release]]] = [] async with db.session() as data: @@ -455,6 +475,17 @@ async def user_projects(asf_uid: str, caller_data: db.Session | None = None) -> return [(p.name, p.display_name) for p in projects] +async def validate_trusted_jwt(publisher: str, jwt: str) -> tuple[dict[str, Any], str | None]: + if publisher != "github": + raise InteractionError(f"Publisher {publisher} not supported") + payload = await jwtoken.verify_github_oidc(jwt) + if payload["actor_id"] != _GITHUB_TRUSTED_ROLE_NID: + asf_uid = await ldap.github_to_apache(payload["actor_id"]) + else: + asf_uid = None + return payload, asf_uid + + async def wait_for_task( task: sql.Task, caller_data: db.Session | None = None, diff --git a/atr/form.py b/atr/form.py index 6f40775..909b0e6 100644 --- a/atr/form.py +++ b/atr/form.py @@ -517,6 +517,10 @@ Email = pydantic.EmailStr class Enum[EnumType: enum.Enum]: + # These exist for type checkers - at runtime, the actual type is the enum + name: str + value: str | int + @staticmethod def __class_getitem__(enum_class: type[EnumType]): def validator(v: Any) -> EnumType: diff --git a/atr/get/distribution.py b/atr/get/distribution.py index 4f166a9..9e6d57f 100644 --- a/atr/get/distribution.py +++ b/atr/get/distribution.py @@ -29,30 +29,35 @@ import atr.util as util import atr.web as web [email protected]("/distributions/list/<project>/<version>") -async def list_get(session: web.Committer, project: str, version: str) -> str: [email protected]("/distribution/automate/<project>/<version>") +async def automate(session: web.Committer, project: str, version: str) -> str: + return await _automate_form_page(project, version, staging=False) + + [email protected]("/distributions/list/<project_name>/<version_name>") +async def list_get(session: web.Committer, project_name: str, version_name: str) -> str: async with db.session() as data: distributions = await data.distribution( - release_name=sql.release_name(project, version), + release_name=sql.release_name(project_name, version_name), ).all() block = htm.Block() - release = await shared.distribution.release_validated(project, version, staging=None) + release = await shared.distribution.release_validated(project_name, version_name, staging=None) staging = release.phase == sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT - render.html_nav_phase(block, project, version, staging) + render.html_nav_phase(block, project_name, version_name, staging) record_a_distribution = htm.a( ".btn.btn-primary", href=util.as_url( - stage if staging else record, - project=project, - version=version, + stage_record if staging else record, + project=project_name, + version=version_name, ), )["Record a distribution"] # Distribution list for project-version - block.h1["Distribution list for ", htm.em[f"{project}-{version}"]] + block.h1["Distribution list for ", htm.em[f"{project_name}-{version_name}"]] if not distributions: block.p["No distributions found."] block.p[record_a_distribution] @@ -89,7 +94,7 @@ async def list_get(session: web.Committer, project: str, version: str) -> str: delete_form = form.render( model_cls=shared.distribution.DeleteForm, - action=util.as_url(post.distribution.delete, project=project, version=version), + action=util.as_url(post.distribution.delete, project=project_name, version=version_name), form_classes=".d-inline-block.m-0", submit_classes="btn-danger btn-sm", submit_label="Delete", @@ -105,7 +110,7 @@ async def list_get(session: web.Committer, project: str, version: str) -> str: ) block.append(htm.div(".mb-3")[delete_form]) - title = f"Distribution list for {project} {version}" + title = f"Distribution list for {project_name} {version_name}" return await template.blank(title, content=block.collect()) @@ -114,11 +119,57 @@ async def record(session: web.Committer, project: str, version: str) -> str: return await _record_form_page(project, version, staging=False) [email protected]("/distribution/stage/<project>/<version>") -async def stage(session: web.Committer, project: str, version: str) -> str: [email protected]("/distribution/stage/automate/<project>/<version>") +async def stage_automate(session: web.Committer, project: str, version: str) -> str: + return await _automate_form_page(project, version, staging=True) + + [email protected]("/distribution/stage/record/<project>/<version>") +async def stage_record(session: web.Committer, project: str, version: str) -> str: return await _record_form_page(project, version, staging=True) +async def _automate_form_page(project: str, version: str, staging: bool) -> str: + """Helper to render the distribution automation form page.""" + await shared.distribution.release_validated(project, version, staging=staging) + + block = htm.Block() + render.html_nav_phase(block, project, version, staging=staging) + + title = "Create a staging distribution" if staging else "Create a distribution" + block.h1[title] + + block.p[ + "Create a distribution of ", + htm.strong[f"{project}-{version}"], + " using the form below.", + ] + block.p[ + "You can also ", + htm.a(href=util.as_url(list_get, project_name=project, version_name=version))["view the distribution list"], + ".", + ] + + # Determine the action based on staging + action = ( + util.as_url(post.distribution.stage_automate_selected, project=project, version=version) + if staging + else util.as_url(post.distribution.automate_selected, project=project, version=version) + ) + + # TODO: Reuse the same form for now - maybe we can combine this and the function below adding an automate=True arg + # Render the distribution form + form_html = form.render( + model_cls=shared.distribution.DistributeForm, + submit_label="Distribute", + action=action, + defaults={"package": project, "version": version}, + ) + block.append(form_html) + + return await template.blank(title, content=block.collect()) + + async def _record_form_page(project: str, version: str, staging: bool) -> str: """Helper to render the distribution recording form page.""" await shared.distribution.release_validated(project, version, staging=staging) @@ -126,23 +177,23 @@ async def _record_form_page(project: str, version: str, staging: bool) -> str: block = htm.Block() render.html_nav_phase(block, project, version, staging=staging) - title = "Record a staging distribution" if staging else "Record a manual distribution" + title = "Record a manual staging distribution" if staging else "Record a manual distribution" block.h1[title] block.p[ - "Record a distribution of ", + "Record a manual distribution of ", htm.strong[f"{project}-{version}"], " using the form below.", ] block.p[ "You can also ", - htm.a(href=util.as_url(list_get, project=project, version=version))["view the distribution list"], + htm.a(href=util.as_url(list_get, project_name=project, version_name=version))["view the distribution list"], ".", ] # Determine the action based on staging action = ( - util.as_url(post.distribution.stage_selected, project=project, version=version) + util.as_url(post.distribution.stage_record_selected, project=project, version=version) if staging else util.as_url(post.distribution.record_selected, project=project, version=version) ) diff --git a/atr/get/finish.py b/atr/get/finish.py index 921bf83..0447e17 100644 --- a/atr/get/finish.py +++ b/atr/get/finish.py @@ -19,6 +19,7 @@ import dataclasses import json import pathlib +from collections.abc import Sequence import aiofiles.os import asfquart.base as base @@ -42,6 +43,7 @@ import atr.mapping as mapping import atr.models.sql as sql import atr.render as render import atr.shared as shared +import atr.tasks.gha as gha import atr.template as template import atr.util as util import atr.web as web @@ -60,13 +62,9 @@ async def selected( ) -> tuple[web.QuartResponse, int] | web.WerkzeugResponse | str: """Finish a release preview.""" try: - ( - release, - source_files_rel, - target_dirs, - deletable_dirs, - rc_analysis, - ) = await _get_page_data(project_name, version_name) + (release, source_files_rel, target_dirs, deletable_dirs, rc_analysis, tasks) = await _get_page_data( + project_name, version_name + ) except ValueError: async with db.session() as data: release_fallback = await data.release( @@ -89,6 +87,7 @@ async def selected( target_dirs=target_dirs, deletable_dirs=deletable_dirs, rc_analysis=rc_analysis, + distribution_tasks=tasks, ) @@ -134,14 +133,31 @@ async def _deletable_choices( async def _get_page_data( project_name: str, version_name: str -) -> tuple[sql.Release, list[pathlib.Path], set[pathlib.Path], list[tuple[str, str]], RCTagAnalysisResult]: +) -> tuple[ + sql.Release, list[pathlib.Path], set[pathlib.Path], list[tuple[str, str]], RCTagAnalysisResult, Sequence[sql.Task] +]: """Get all the data needed to render the finish page.""" async with db.session() as data: + via = sql.validate_instrumented_attribute release = await data.release( project_name=project_name, version=version_name, _committee=True, ).demand(base.ASFQuartException("Release does not exist", errorcode=404)) + tasks = [ + t + for t in ( + await data.task( + project_name=project_name, + version_name=version_name, + revision_number=release.latest_revision_number, + task_type=sql.TaskType.DISTRIBUTION_WORKFLOW, + ) + .order_by(sql.sqlmodel.desc(via(sql.Task.started))) + .all() + ) + if t.status in [sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE, sql.TaskStatus.FAILED] + ] if release.phase != sql.ReleasePhase.RELEASE_PREVIEW: raise ValueError("Release is not in preview phase") @@ -151,7 +167,7 @@ async def _get_page_data( deletable_dirs = await _deletable_choices(latest_revision_dir, target_dirs) rc_analysis_result = await _analyse_rc_tags(latest_revision_dir) - return release, source_files_rel, target_dirs, deletable_dirs, rc_analysis_result + return release, source_files_rel, target_dirs, deletable_dirs, rc_analysis_result, tasks def _render_delete_directory_form(deletable_dirs: list[tuple[str, str]]) -> htm.Element: @@ -172,6 +188,74 @@ def _render_delete_directory_form(deletable_dirs: list[tuple[str, str]]) -> htm. return section.collect() +def _render_dist_warning() -> htm.Element: + """Render the alert about distribution tools.""" + return htm.div(".alert.alert-warning.mb-4", role="alert")[ + htm.p(".fw-semibold.mb-1")["NOTE:"], + htm.p(".mb-1")[ + "Tools to distribute automatically are still being developed, " + "you must do this manually at present. Please use the manual record function below to do so.", + ], + ] + + +def _render_distribution_buttons(release: sql.Release) -> htm.Element: + """Render the distribution tool buttons.""" + return htm.div()[ + htm.p(".mb-1")[ + htm.a( + ".btn.btn-primary.me-2", + href=util.as_url( + distribution.automate, + project=release.project.name, + version=release.version, + ), + )["Distribute"], + htm.a( + ".btn.btn-secondary.me-2", + href=util.as_url( + distribution.record, + project=release.project.name, + version=release.version, + ), + )["Record a manual distribution"], + ], + ] + + +def _render_distribution_tasks(release: sql.Release, tasks: Sequence[sql.Task]) -> htm.Element: + """Render current and failed distribution tasks.""" + failed_tasks = [t for t in tasks if t.status == sql.TaskStatus.FAILED] + in_progress_tasks = [t for t in tasks if t.status in [sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE]] + + block = htm.Block() + + if len(failed_tasks) > 0: + summary = f"{len(failed_tasks)} distribution{'s' if len(failed_tasks) > 1 else ''} failed for this release" + block.append( + htm.div(".alert.alert-danger.mb-3")[ + htm.h3["Failed distributions"], + htm.details[ + htm.summary[summary], + htm.div[*[_render_task(f) for f in failed_tasks]], + ], + ] + ) + if len(in_progress_tasks) > 0: + block.append( + htm.div(".alert.alert-info.mb-3")[ + htm.h3["In-progress distributions"], + htm.p["One or more automatic distributions are still in-progress:"], + *[_render_task(f) for f in in_progress_tasks], + htm.button( + ".btn.btn-success.me-2", + {"onclick": "window.location.reload()"}, + )["Refresh"], + ] + ) + return block.collect() + + def _render_move_section(max_files_to_show: int = 10) -> htm.Element: """Render the move files section with JavaScript interaction.""" section = htm.Block() @@ -247,6 +331,7 @@ async def _render_page( target_dirs: set, deletable_dirs: list[tuple[str, str]], rc_analysis: RCTagAnalysisResult, + distribution_tasks: Sequence[sql.Task], ) -> str: """Render the finish page using htm.py.""" page = htm.Block() @@ -275,8 +360,11 @@ async def _render_page( "such as Maven Central, PyPI, or Docker Hub." ] - # TODO alert - page.append(_render_todo_alert(release)) + if len(distribution_tasks) > 0: + page.append(_render_distribution_tasks(release, distribution_tasks)) + + page.append(_render_dist_warning()) + page.append(_render_distribution_buttons(release)) # Move files section page.append(_render_move_section(max_files_to_show=10)) @@ -401,7 +489,7 @@ def _render_release_card(release: sql.Release) -> htm.Element: version_name=release.version, ), )[ - htpy.i(".bi.bi-download"), + htm.icon("download"), " Download all files", ], htm.a( @@ -413,7 +501,7 @@ def _render_release_card(release: sql.Release) -> htm.Element: version_name=release.version, ), )[ - htpy.i(".bi.bi-archive"), + htm.icon("archive"), " Show files", ], htm.a( @@ -425,7 +513,7 @@ def _render_release_card(release: sql.Release) -> htm.Element: version_name=release.version, ), )[ - htpy.i(".bi.bi-clock-history"), + htm.icon("clock-history"), " Show revisions", ], htm.a( @@ -437,7 +525,7 @@ def _render_release_card(release: sql.Release) -> htm.Element: version_name=release.version, ), )[ - htpy.i(".bi.bi-check-circle"), + htm.icon("check-circle"), " Announce and distribute", ], ], @@ -446,22 +534,12 @@ def _render_release_card(release: sql.Release) -> htm.Element: return card -def _render_todo_alert(release: sql.Release) -> htm.Element: - """Render the TODO alert about distribution tools.""" - return htm.div(".alert.alert-warning.mb-4", role="alert")[ - htm.p(".fw-semibold.mb-1")["TODO"], - htm.p(".mb-1")[ - "We plan to add tools to help release managers to distribute release artifacts on distribution networks. " - "Currently you must do this manually. Once you've distributed your release artifacts, you can ", - htm.a( - href=util.as_url( - distribution.record, - project=release.project.name, - version=release.version, - ) - )["record them on the ATR"], - ".", - ], +def _render_task(task: sql.Task) -> htm.Element: + """Render a distribution task's details.""" + args: gha.DistributionWorkflow = gha.DistributionWorkflow.model_validate(task.task_args) + status = task.status.value + return htm.p[ + f"{args.platform} ({args.package} {args.version}): {task.error if task.error else status.capitalize()}" ] diff --git a/atr/models/api.py b/atr/models/api.py index d06c5d3..4be4f96 100644 --- a/atr/models/api.py +++ b/atr/models/api.py @@ -67,6 +67,36 @@ class CommitteesListResults(schema.Strict): committees: Sequence[sql.Committee] +class DistributeSshRegisterArgs(schema.Strict): + publisher: str = schema.example("user") + jwt: str = schema.example("eyJhbGciOiJIUzI1[...]mMjLiuyu5CSpyHI=") + ssh_key: str = schema.example("ssh-ed25519 AAAAC3NzaC1lZDI1NTEgH5C9okWi0dh25AAAAIOMqqnkVzrm0SdG6UOoqKLsabl9GKJl") + phase: str = schema.Field(strict=False, default="compose", json_schema_extra={"examples": ["compose", "finish"]}) + version: str = schema.example("0.0.1") + + +class DistributeSshRegisterResults(schema.Strict): + endpoint: Literal["/distribute/ssh/register"] = schema.alias("endpoint") + fingerprint: str = schema.example("SHA256:0123456789abcdef0123456789abcdef01234567") + project: str = schema.example("example") + expires: int = schema.example(1713547200) + + +class DistributeStatusUpdateArgs(schema.Strict): + publisher: str = schema.example("user") + jwt: str = schema.example("eyJhbGciOiJIUzI1[...]mMjLiuyu5CSpyHI=") + workflow: str = schema.description("Workflow name") + run_id: int = schema.description("Workflow run ID") + project_name: str = schema.description("Project name in ATR") + status: str = schema.description("Workflow status") + message: str = schema.description("Workflow message") + + +class DistributeStatusUpdateResults(schema.Strict): + endpoint: Literal["/distribute/task/status"] = schema.alias("endpoint") + success: Literal[True] = schema.example(True) + + class DistributionRecordArgs(schema.Strict): project: str = schema.example("example") version: str = schema.example("0.0.1") @@ -592,6 +622,7 @@ validate_committee_keys = validator(CommitteeKeysResults) validate_committee_projects = validator(CommitteeProjectsResults) validate_committees_list = validator(CommitteesListResults) validate_distribution_record = validator(DistributionRecordResults) +validate_distribution_ssh_register = validator(DistributeSshRegisterResults) validate_ignore_add = validator(IgnoreAddResults) validate_ignore_delete = validator(IgnoreDeleteResults) validate_ignore_list = validator(IgnoreListResults) diff --git a/atr/models/results.py b/atr/models/results.py index 713291d..5dc1ce2 100644 --- a/atr/models/results.py +++ b/atr/models/results.py @@ -24,10 +24,10 @@ import atr.sbom.models.osv as osv from . import schema -class GithubActionsWorkflow(schema.Strict): +class DistributionWorkflow(schema.Strict): """Result of the task to run a Github workflow.""" - kind: Literal["github_actions_workflow"] = schema.Field(alias="kind") + kind: Literal["distribution_workflow"] = schema.Field(alias="kind") name: str = schema.description("The name of the action being performed") run_id: int = schema.description("The ID of the workflow run") url: str = schema.description("The URL of the workflow run") @@ -193,7 +193,7 @@ class MetadataUpdate(schema.Strict): Results = Annotated[ - GithubActionsWorkflow + DistributionWorkflow | HashingCheck | MessageSend | MetadataUpdate diff --git a/atr/models/sql.py b/atr/models/sql.py index 32a56f2..df0e461 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -53,6 +53,7 @@ sqlmodel.SQLModel.metadata = sqlalchemy.MetaData( @dataclasses.dataclass(frozen=True) class DistributionPlatformValue: name: str + gh_slug: str template_url: str template_staging_url: str | None = None requires_owner_namespace: bool = False @@ -95,12 +96,14 @@ class CheckResultStatusIgnore(str, enum.Enum): class DistributionPlatform(enum.Enum): ARTIFACT_HUB = DistributionPlatformValue( name="Artifact Hub", + gh_slug="artifacthub", template_url="https://artifacthub.io/api/v1/packages/helm/{owner_namespace}/{package}/{version}", template_staging_url="https://staging.artifacthub.io/api/v1/packages/helm/{owner_namespace}/{package}/{version}", requires_owner_namespace=True, ) DOCKER_HUB = DistributionPlatformValue( name="Docker Hub", + gh_slug="dockerhub", template_url="https://hub.docker.com/v2/namespaces/{owner_namespace}/repositories/{package}/tags/{version}", # TODO: Need to use staging tags? # template_staging_url="https://hub.docker.com/v2/namespaces/{owner_namespace}/repositories/{package}/tags/{version}", @@ -108,6 +111,7 @@ class DistributionPlatform(enum.Enum): ) # GITHUB = DistributionPlatformValue( # name="GitHub", + # gh_slug="github", # template_url="https://api.github.com/repos/{owner_namespace}/{package}/releases/tags/v{version}", # # Combine with {"prerelease": true} # template_staging_url="https://api.github.com/repos/{owner_namespace}/{package}/releases", @@ -115,6 +119,7 @@ class DistributionPlatform(enum.Enum): # ) MAVEN = DistributionPlatformValue( name="Maven Central", + gh_slug="maven", template_url="https://search.maven.org/solrsearch/select?q=g:{owner_namespace}+AND+a:{package}+AND+v:{version}&core=gav&rows=20&wt=json", # Java ASF projects use staging URLs along the lines of # https://repository.apache.org/content/repositories/orgapachePROJECT-NNNN/ @@ -123,17 +128,20 @@ class DistributionPlatform(enum.Enum): ) NPM = DistributionPlatformValue( name="npm", + gh_slug="npm", # TODO: Need to parse dist-tags template_url="https://registry.npmjs.org/{package}", ) NPM_SCOPED = DistributionPlatformValue( name="npm (scoped)", + gh_slug="npm", # TODO: Need to parse dist-tags template_url="https://registry.npmjs.org/@{owner_namespace}/{package}", requires_owner_namespace=True, ) PYPI = DistributionPlatformValue( name="PyPI", + gh_slug="pypi", template_url="https://pypi.org/pypi/{package}/{version}/json", template_staging_url="https://test.pypi.org/pypi/{package}/{version}/json", ) @@ -179,7 +187,7 @@ class TaskStatus(str, enum.Enum): class TaskType(str, enum.Enum): - GITHUB_ACTION_WORKFLOW = "github_action_workflow" + DISTRIBUTION_WORKFLOW = "distribution_workflow" HASHING_CHECK = "hashing_check" KEYS_IMPORT_FILE = "keys_import_file" LICENSE_FILES = "license_files" @@ -353,6 +361,8 @@ class Task(sqlmodel.SQLModel, table=True): result: results.Results | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(ResultsJSON)) error: str | None = None + workflow: "WorkflowStatus" = sqlmodel.Relationship(back_populates="task") + # Used for check tasks # We don't put these in task_args because we want to query them efficiently project_name: str | None = sqlmodel.Field(default=None, foreign_key="project.name") @@ -1157,6 +1167,17 @@ class Revision(sqlmodel.SQLModel, table=True): ) +# WorkflowStatus: +class WorkflowStatus(sqlmodel.SQLModel, table=True): + workflow_id: str = sqlmodel.Field(primary_key=True, index=True) + run_id: int = sqlmodel.Field(primary_key=True, index=True) + project_name: str = sqlmodel.Field(index=True) + task_id: int | None = sqlmodel.Field(default=None, foreign_key="task.id", ondelete="SET NULL") + task: Task = sqlmodel.Relationship(back_populates="workflow") + status: str = sqlmodel.Field() + message: str | None = sqlmodel.Field(default=None) + + def revision_name(release_name: str, number: str) -> str: return f"{release_name} {number}" diff --git a/atr/post/distribution.py b/atr/post/distribution.py index 15fa395..f5aea27 100644 --- a/atr/post/distribution.py +++ b/atr/post/distribution.py @@ -17,6 +17,8 @@ from __future__ import annotations +from typing import Final + import atr.blueprints.post as post import atr.db as db import atr.get as get @@ -25,6 +27,83 @@ import atr.shared as shared import atr.storage as storage import atr.web as web +_AUTOMATED_PLATFORMS: Final[tuple[shared.distribution.DistributionPlatform, ...]] = ( + shared.distribution.DistributionPlatform.MAVEN, +) +_AUTOMATED_PLATFORMS_STAGE: Final[tuple[shared.distribution.DistributionPlatform, ...]] = ( + shared.distribution.DistributionPlatform.MAVEN, +) + + +async def automate_form_process_page( + session: web.Committer, + form_data: shared.distribution.DistributeForm, + project: str, + version: str, + /, + staging: bool = False, +) -> web.WerkzeugResponse: + allowed_platforms = _AUTOMATED_PLATFORMS_STAGE if staging else _AUTOMATED_PLATFORMS + if form_data.platform not in allowed_platforms: + platform_str = form_data.platform.value + return await session.redirect( + get.distribution.stage_automate if staging else get.distribution.automate, + project=project, + version=version, + error=f"Platform {platform_str} is not supported for automated distribution", + ) + sql_platform = form_data.platform.to_sql() # type: ignore[attr-defined] + dd = distribution.Data( + platform=sql_platform, + owner_namespace=form_data.owner_namespace, + package=form_data.package, + version=form_data.version, + details=form_data.details, + ) + release, committee = await shared.distribution.release_validated_and_committee( + project, version, staging=staging, release_policy=True + ) + + async with storage.write_as_committee_member(committee_name=committee.name) as w: + try: + await w.distributions.automate( + release.name, + dd.platform, + committee.name, + dd.owner_namespace, + project, + version, + release.latest_revision_number, + dd.package, + dd.version, + staging, + ) + except storage.AccessError as e: + # Instead of calling record_form_page_new, redirect with error message + return await session.redirect( + get.distribution.stage_automate if staging else get.distribution.automate, + project=project, + version=version, + error=str(e), + ) + + # Success - redirect to distribution list with success message + message = "Distribution queued successfully." + return await session.redirect( + get.distribution.list_get if staging else get.finish.selected, + project_name=project, + version_name=version, + success=message, + ) + + [email protected]("/distribution/automate/<project>/<version>") [email protected](shared.distribution.DistributeForm) +async def automate_selected( + session: web.Committer, distribute_form: shared.distribution.DistributeForm, project: str, version: str +) -> web.WerkzeugResponse: + return await automate_form_process_page(session, distribute_form, project, version, staging=False) + @post.committer("/distribution/delete/<project>/<version>") @post.form(shared.distribution.DeleteForm) @@ -53,8 +132,8 @@ async def delete( ) return await session.redirect( get.distribution.list_get, - project=project, - version=version, + project_name=project, + version_name=version, success="Distribution deleted", ) @@ -84,14 +163,14 @@ async def record_form_process_page( async with storage.write_as_committee_member(committee_name=committee.name) as w: try: _dist, added, _metadata = await w.distributions.record_from_data( - release=release, + release_name=release.name, staging=staging, dd=dd, ) except storage.AccessError as e: # Instead of calling record_form_page_new, redirect with error message return await session.redirect( - get.distribution.stage if staging else get.distribution.record, + get.distribution.stage_record if staging else get.distribution.record, project=project, version=version, error=str(e), @@ -101,8 +180,8 @@ async def record_form_process_page( message = "Distribution recorded successfully." if added else "Distribution was already recorded." return await session.redirect( get.distribution.list_get, - project=project, - version=version, + project_name=project, + version_name=version, success=message, ) @@ -115,9 +194,17 @@ async def record_selected( return await record_form_process_page(session, distribute_form, project, version, staging=False) [email protected]("/distribution/stage/<project>/<version>") [email protected]("/distribution/stage/automate/<project>/<version>") [email protected](shared.distribution.DistributeForm) +async def stage_automate_selected( + session: web.Committer, distribute_form: shared.distribution.DistributeForm, project: str, version: str +) -> web.WerkzeugResponse: + return await automate_form_process_page(session, distribute_form, project, version, staging=True) + + [email protected]("/distribution/stage/record/<project>/<version>") @post.form(shared.distribution.DistributeForm) -async def stage_selected( +async def stage_record_selected( session: web.Committer, distribute_form: shared.distribution.DistributeForm, project: str, version: str ) -> web.WerkzeugResponse: return await record_form_process_page(session, distribute_form, project, version, staging=True) diff --git a/atr/shared/distribution.py b/atr/shared/distribution.py index 444d066..743c781 100644 --- a/atr/shared/distribution.py +++ b/atr/shared/distribution.py @@ -132,10 +132,7 @@ def html_tr_a(label: str, value: str | None) -> htm.Element: async def release_validated( - project: str, - version: str, - committee: bool = False, - staging: bool | None = None, + project: str, version: str, committee: bool = False, staging: bool | None = None, release_policy: bool = False ) -> sql.Release: match staging: case True: @@ -149,6 +146,7 @@ async def release_validated( project_name=project, version=version, _committee=committee, + _release_policy=release_policy, ).demand(RuntimeError(f"Release {project} {version} not found")) if release.phase not in phase: raise RuntimeError(f"Release {project} {version} is not in {phase}") @@ -158,12 +156,9 @@ async def release_validated( async def release_validated_and_committee( - project: str, - version: str, - *, - staging: bool | None = None, + project: str, version: str, *, staging: bool | None = None, release_policy: bool = False ) -> tuple[sql.Release, sql.Committee]: - release = await release_validated(project, version, committee=True, staging=staging) + release = await release_validated(project, version, committee=True, staging=staging, release_policy=release_policy) committee = release.committee if committee is None: raise RuntimeError(f"Release {project} {version} has no committee") diff --git a/atr/storage/__init__.py b/atr/storage/__init__.py index cd77e89..3a7305a 100644 --- a/atr/storage/__init__.py +++ b/atr/storage/__init__.py @@ -218,6 +218,7 @@ class WriteAsCommitteeMember(WriteAsCommitteeParticipant): self.ssh = writers.ssh.CommitteeMember(write, self, data, committee_name) self.tokens = writers.tokens.CommitteeMember(write, self, data, committee_name) self.vote = writers.vote.CommitteeMember(write, self, data, committee_name) + self.workflowstatus = writers.workflowstatus.CommitteeMember(write, self, data, committee_name) @property def asf_uid(self) -> str: diff --git a/atr/storage/writers/__init__.py b/atr/storage/writers/__init__.py index 8c98552..27fe59d 100644 --- a/atr/storage/writers/__init__.py +++ b/atr/storage/writers/__init__.py @@ -30,6 +30,7 @@ import atr.storage.writers.sbom as sbom import atr.storage.writers.ssh as ssh import atr.storage.writers.tokens as tokens import atr.storage.writers.vote as vote +import atr.storage.writers.workflowstatus as workflowstatus __all__ = [ "announce", @@ -45,4 +46,5 @@ __all__ = [ "ssh", "tokens", "vote", + "workflowstatus", ] diff --git a/atr/storage/writers/distributions.py b/atr/storage/writers/distributions.py index 2db5d85..d74404b 100644 --- a/atr/storage/writers/distributions.py +++ b/atr/storage/writers/distributions.py @@ -31,6 +31,8 @@ import atr.models.distribution as distribution import atr.models.sql as sql import atr.storage as storage import atr.storage.outcome as outcome +import atr.tasks.gha as gha +import atr.util as util class GeneralPublic: @@ -95,6 +97,46 @@ class CommitteeMember(CommitteeParticipant): self.__asf_uid = asf_uid self.__committee_name = committee_name + async def automate( + self, + release_name: str, + platform: sql.DistributionPlatform, + committee_name: str, + owner_namespace: str | None, + project_name: str, + version_name: str, + revision_number: str | None, + package: str, + version: str, + staging: bool, + ) -> sql.Task: + dist_task = sql.Task( + task_type=sql.TaskType.DISTRIBUTION_WORKFLOW, + task_args=gha.DistributionWorkflow( + name=release_name, + namespace=owner_namespace or "", + package=package, + version=version, + project_name=project_name, + version_name=version_name, + platform=platform.name, + staging=staging, + asf_uid=self.__asf_uid, + committee_name=committee_name, + arguments={}, + ).model_dump(), + asf_uid=util.unwrap(self.__asf_uid), + added=datetime.datetime.now(datetime.UTC), + status=sql.TaskStatus.QUEUED, + project_name=project_name, + version_name=version_name, + revision_number=revision_number, + ) + self.__data.add(dist_task) + await self.__data.commit() + await self.__data.refresh(dist_task) + return dist_task + async def record( self, release_name: str, @@ -148,7 +190,7 @@ class CommitteeMember(CommitteeParticipant): async def record_from_data( self, - release: sql.Release, + release_name: str, staging: bool, dd: distribution.Data, ) -> tuple[sql.Distribution, bool, distribution.Metadata]: @@ -176,7 +218,7 @@ class CommitteeMember(CommitteeParticipant): web_url=web_url, ) dist, added = await self.record( - release_name=release.name, + release_name=release_name, platform=dd.platform, owner_namespace=dd.owner_namespace, package=dd.package, diff --git a/atr/storage/writers/workflowstatus.py b/atr/storage/writers/workflowstatus.py new file mode 100644 index 0000000..2aa92cf --- /dev/null +++ b/atr/storage/writers/workflowstatus.py @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Removing this will cause circular imports +from __future__ import annotations + +import atr.db as db +import atr.models.sql as sql +import atr.storage as storage + + +class GeneralPublic: + def __init__( + self, + write: storage.Write, + write_as: storage.WriteAsGeneralPublic, + data: db.Session, + ): + self.__write = write + self.__write_as = write_as + self.__data = data + self.__asf_uid = write.authorisation.asf_uid + + +class FoundationCommitter(GeneralPublic): + def __init__(self, write: storage.Write, write_as: storage.WriteAsFoundationCommitter, data: db.Session): + super().__init__(write, write_as, data) + self.__write = write + self.__write_as = write_as + self.__data = data + asf_uid = write.authorisation.asf_uid + if asf_uid is None: + raise storage.AccessError("No ASF UID") + self.__asf_uid = asf_uid + + # async def add_key(self, key: str, asf_uid: str) -> str: + # fingerprint = util.key_ssh_fingerprint(key) + # self.__data.add(sql.SSHKey(fingerprint=fingerprint, key=key, asf_uid=asf_uid)) + # await self.__data.commit() + # return fingerprint + # + # async def delete_key(self, fingerprint: str) -> None: + # ssh_key = await self.__data.ssh_key( + # fingerprint=fingerprint, + # asf_uid=self.__asf_uid, + # ).demand(storage.AccessError(f"Key not found: {fingerprint}")) + # await self.__data.delete(ssh_key) + # await self.__data.commit() + + +class CommitteeParticipant(FoundationCommitter): + def __init__( + self, + write: storage.Write, + write_as: storage.WriteAsCommitteeParticipant, + data: db.Session, + committee_name: str, + ): + super().__init__(write, write_as, data) + self.__write = write + self.__write_as = write_as + self.__data = data + asf_uid = write.authorisation.asf_uid + if asf_uid is None: + raise storage.AccessError("No ASF UID") + self.__asf_uid = asf_uid + self.__committee_name = committee_name + + +class CommitteeMember(CommitteeParticipant): + def __init__( + self, + write: storage.Write, + write_as: storage.WriteAsCommitteeMember, + data: db.Session, + committee_name: str, + ): + super().__init__(write, write_as, data, committee_name) + self.__write = write + self.__write_as = write_as + self.__data = data + asf_uid = write.authorisation.asf_uid + if asf_uid is None: + raise storage.AccessError("No ASF UID") + self.__asf_uid = asf_uid + self.__committee_name = committee_name + + async def add_workflow_status( + self, + workflow_id: str, + run_id: int, + project_name: str, + task_id: int | None = None, + status: str | None = None, + message: str | None = None, + ) -> sql.WorkflowStatus: + # now = int(time.time()) + # # Twenty minutes to upload all files + # ttl = 20 * 60 + # expires = now + ttl + # fingerprint = util.key_ssh_fingerprint(key) + ws = sql.WorkflowStatus( + workflow_id=workflow_id, + run_id=run_id, + project_name=project_name, + task_id=task_id, + status=status or "", + message=message, + ) + self.__data.add(ws) + await self.__data.commit() + self.__write_as.append_to_audit_log( + workflow_id=workflow_id, + run_id=run_id, + project_name=project_name, + task_id=task_id, + status=status, + message=message, + ) + return ws diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index dde92ed..9d89e31 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -189,7 +189,7 @@ def queued( def resolve(task_type: sql.TaskType) -> Callable[..., Awaitable[results.Results | None]]: # noqa: C901 match task_type: - case sql.TaskType.GITHUB_ACTION_WORKFLOW: + case sql.TaskType.DISTRIBUTION_WORKFLOW: return gha.trigger_workflow case sql.TaskType.HASHING_CHECK: return hashing.check diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py index f64b2cc..09741bc 100644 --- a/atr/tasks/gha.py +++ b/atr/tasks/gha.py @@ -26,60 +26,87 @@ import atr.config as config import atr.log as log import atr.models.results as results import atr.models.schema as schema +import atr.models.sql as sql + +# import atr.shared as shared +import atr.storage as storage import atr.tasks.checks as checks _BASE_URL: Final[str] = "https://api.github.com/repos" _IN_PROGRESS_STATUSES: Final[list[str]] = ["in_progress", "queued", "requested", "waiting", "pending", "expected"] _COMPLETED_STATUSES: Final[list[str]] = ["completed"] _FAILED_STATUSES: Final[list[str]] = ["failure", "startup_failure"] -_TIMEOUT_S = 5 +_TIMEOUT_S = 60 -class GithubActionsWorkflow(schema.Strict): +class DistributionWorkflow(schema.Strict): """Arguments for the task to start a Github Actions workflow.""" - owner: str = schema.description("Github owner of the repository") - repo: str = schema.description("Repository in which to start the workflow") - workflow_id: str = schema.description("Workflow ID") - ref: str = schema.description("Git ref to trigger the workflow") + namespace: str = schema.description("Namespace to distribute to") + package: str = schema.description("Package to distribute") + version: str = schema.description("Version to distribute") + staging: bool = schema.description("Whether this is a staging distribution") + project_name: str = schema.description("Project name in ATR") + version_name: str = schema.description("Version name in ATR") + asf_uid: str = schema.description("ASF UID of the user triggering the workflow") + committee_name: str = schema.description("Committee name in ATR") + platform: str = schema.description("Distribution platform") arguments: dict[str, str] = schema.description("Workflow arguments") name: str = schema.description("Name of the run") [email protected]_model(GithubActionsWorkflow) -async def trigger_workflow(args: GithubActionsWorkflow) -> results.Results | None: [email protected]_model(DistributionWorkflow) +async def trigger_workflow(args: DistributionWorkflow, *, task_id: int | None = None) -> results.Results | None: unique_id = f"{args.name}-{uuid.uuid4()}" - payload = {"ref": args.ref, "inputs": {"atr-id": unique_id, **args.arguments}} + try: + sql_platform = sql.DistributionPlatform[args.platform] + except KeyError: + _fail(f"Invalid platform: {args.platform}") + workflow = f"distribute-{sql_platform.value.gh_slug}.yml" + payload = { + "ref": "main", + "inputs": { + "atr-id": unique_id, + "project": args.project_name, + "version": args.version_name, + "distribution-owner-namespace": args.namespace, + "distribution-package": args.package, + "distribution-version": args.version, + "staging": "true" if args.staging else "false", + **args.arguments, + }, + } headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {config.get().GITHUB_TOKEN}"} log.info( - f"Triggering Github workflow {args.owner}/{args.repo}/{args.workflow_id} with args: { + f"Triggering Github workflow apache/tooling-actions/{workflow} with args: { json.dumps(args.arguments, indent=2) }" ) async with aiohttp.ClientSession() as session: try: async with session.post( - f"{_BASE_URL}/{args.owner}/{args.repo}/actions/workflows/{args.workflow_id}/dispatches", + f"{_BASE_URL}/apache/tooling-actions/actions/workflows/{workflow}/dispatches", headers=headers, json=payload, ) as response: response.raise_for_status() except aiohttp.ClientResponseError as e: - _fail(f"Failed to trigger workflow run: {e.message} ({e.status})") - - run, run_id = await _find_triggered_run(session, args, headers, unique_id) + _fail(f"Failed to trigger GitHub workflow: {e.message} ({e.status})") - if run.get("status") in _IN_PROGRESS_STATUSES: - run = await _wait_for_completion(session, args, headers, run_id, unique_id) + run, run_id = await _find_triggered_run(session, headers, unique_id) if run.get("status") in _FAILED_STATUSES: - _fail(f"Github workflow {args.owner}/{args.repo}/{args.workflow_id} run {run_id} failed with error") - if run.get("status") in _COMPLETED_STATUSES: - log.info(f"Workflow {args.owner}/{args.repo}/{args.workflow_id} run {run_id} completed successfully") - return results.GithubActionsWorkflow( - kind="github_actions_workflow", name=args.name, run_id=run_id, url=run.get("html_url", "") - ) - _fail(f"Timed out waiting for workflow {args.owner}/{args.repo}/{args.workflow_id}") + _fail(f"Github workflow apache/tooling-actions/{workflow} run {run_id} failed with error") + async with storage.write_as_committee_member(args.committee_name, args.asf_uid) as w: + try: + await w.workflowstatus.add_workflow_status( + workflow, run_id, args.project_name, task_id, status=run.get("status") + ) + except storage.AccessError as e: + _fail(f"Failed to record distribution: {e}") + return results.DistributionWorkflow( + kind="distribution_workflow", name=args.name, run_id=run_id, url=run.get("html_url", "") + ) def _fail(message: str) -> NoReturn: @@ -89,7 +116,6 @@ def _fail(message: str) -> NoReturn: async def _find_triggered_run( session: aiohttp.ClientSession, - args: GithubActionsWorkflow, headers: dict[str, str], unique_id: str, ) -> tuple[dict[str, Any], int]: @@ -97,12 +123,12 @@ async def _find_triggered_run( def get_run(resp: dict[str, Any]) -> dict[str, Any] | None: return next( - (r for r in resp["workflow_runs"] if (r["head_branch"] == args.ref) and (r["name"] == unique_id)), + (r for r in resp["workflow_runs"] if (r["head_branch"] == "main") and (r["name"] == unique_id)), None, ) run = await _request_and_retry( - session, f"{_BASE_URL}/{args.owner}/{args.repo}/actions/runs?event=workflow_dispatch", headers, get_run + session, f"{_BASE_URL}/apache/tooling-actions/actions/runs?event=workflow_dispatch", headers, get_run ) if run is None: _fail(f"Failed to find triggered workflow run for {unique_id}") @@ -112,6 +138,31 @@ async def _find_triggered_run( return run, run_id +# +# async def _record_distribution( +# committee_name: str, +# release: str, +# platform: sql.DistributionPlatform, +# namespace: str, +# package: str, +# version: str, +# staging: bool, +# ): +# log.info("Creating distribution record") +# dd = distribution.Data( +# platform=platform, +# owner_namespace=namespace, +# package=package, +# version=version, +# details=False, +# ) +# async with storage.write_as_committee_member(committee_name=committee_name) as w: +# try: +# _dist, _added, _metadata = await w.distributions.record_from_data(release=release, staging=staging, dd=dd) +# except storage.AccessError as e: +# _fail(f"Failed to record distribution: {e}") + + async def _request_and_retry( session: aiohttp.client.ClientSession, url: str, @@ -132,29 +183,30 @@ async def _request_and_retry( else: return data except aiohttp.ClientResponseError as e: - # We don't raise here as it could be an emphemeral error - if it continues it will return None + # We don't raise here as it could be an ephemeral error - if it continues it will return None log.error(f"Failure calling Github: {e.message} ({e.status}, attempt {_attempt + 1})") await asyncio.sleep(0.1) return None -async def _wait_for_completion( - session: aiohttp.ClientSession, - args: GithubActionsWorkflow, - headers: dict[str, str], - run_id: int, - unique_id: str, -) -> dict[str, Any]: - """Wait for a workflow run to complete.""" - - def filter_run(resp: dict[str, Any]) -> dict[str, Any] | None: - if resp.get("status") not in _IN_PROGRESS_STATUSES: - return resp - return None - - run = await _request_and_retry( - session, f"{_BASE_URL}/{args.owner}/{args.repo}/actions/runs/{run_id}", headers, filter_run - ) - if run is None: - _fail(f"Failed to find triggered workflow run for {unique_id}") - return run +# +# async def _wait_for_completion( +# session: aiohttp.ClientSession, +# args: DistributionWorkflow, +# headers: dict[str, str], +# run_id: int, +# unique_id: str, +# ) -> dict[str, Any]: +# """Wait for a workflow run to complete.""" +# +# def filter_run(resp: dict[str, Any]) -> dict[str, Any] | None: +# if resp.get("status") not in _IN_PROGRESS_STATUSES: +# return resp +# return None +# +# run = await _request_and_retry( +# session, f"{_BASE_URL}/{args.owner}/{args.repo}/actions/runs/{run_id}", headers, filter_run +# ) +# if run is None: +# _fail(f"Failed to find triggered workflow run for {unique_id}") +# return run diff --git a/atr/templates/check-selected.html b/atr/templates/check-selected.html index 0bb1534..77b793d 100644 --- a/atr/templates/check-selected.html +++ b/atr/templates/check-selected.html @@ -125,9 +125,19 @@ </div> </div> {% if phase == "release_candidate_draft" %} + <h3 id="distribution" class="mt-4">Distribution</h3> + <p> + While this release is in draft, you can create a staging distribution. Use the buttons below to either create one automatically (where supported) or record a manual distribution performed outside of ATR. + </p> + <div class="alert alert-warning mb-4"> + <p class="fw-semibold mb-1">NOTE:</p> + <p>At present, automated distributions are being developed. Please use the manual record button if you need to record a distribution.</p> + </div> <p> <a class="btn btn-primary" - href="{{ as_url(get.distribution.stage, project=release.project.name, version=release.version) }}">Record a distribution</a> + href="{{ as_url(get.distribution.stage_automate, project=release.project.name, version=release.version) }}">Distribute</a> + <a class="btn btn-secondary" + href="{{ as_url(get.distribution.stage_record, project=release.project.name, version=release.version) }}">Record a manual distribution</a> </p> <h2 id="more-actions">More actions</h2> <h3 id="ignored-checks" class="mt-4">Ignored checks</h3> diff --git a/atr/worker.py b/atr/worker.py index d2ba596..79e2823 100644 --- a/atr/worker.py +++ b/atr/worker.py @@ -204,7 +204,10 @@ async def _task_process(task_id: int, task_type: str, task_args: list[str] | dic handler_result = await handler(function_arguments) else: # Otherwise, it's not a check handler - handler_result = await handler(task_args) + if sig.parameters.get("task_id") is None: + handler_result = await handler(task_args) + else: + handler_result = await handler(task_args, task_id=task_id) task_results = handler_result status = task.COMPLETED diff --git a/migrations/versions/0037_2026.01.13_0cefcaea.py b/migrations/versions/0037_2026.01.13_0cefcaea.py new file mode 100644 index 0000000..4d3e58e --- /dev/null +++ b/migrations/versions/0037_2026.01.13_0cefcaea.py @@ -0,0 +1,46 @@ +"""Add status for external workflows + +Revision ID: 0037_2026.01.13_0cefcaea +Revises: 0036_2026.01.12_3831f215 +Create Date: 2026-01-13 14:36:37.322569+00:00 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# Revision identifiers, used by Alembic +revision: str = "0037_2026.01.13_0cefcaea" +down_revision: str | None = "0036_2026.01.12_3831f215" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "workflowstatus", + sa.Column("workflow_id", sa.String(), nullable=False), + sa.Column("run_id", sa.Integer(), nullable=False), + sa.Column("project_name", sa.String(), nullable=False), + sa.Column("task_id", sa.Integer(), nullable=True), + sa.Column("status", sa.String(), nullable=False), + sa.Column("message", sa.String(), nullable=True), + sa.ForeignKeyConstraint( + ["task_id"], ["task.id"], name=op.f("fk_workflowstatus_task_id_task"), ondelete="SET NULL" + ), + sa.PrimaryKeyConstraint("workflow_id", "run_id", name=op.f("pk_workflowstatus")), + ) + with op.batch_alter_table("workflowstatus", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_workflowstatus_project_name"), ["project_name"], unique=False) + batch_op.create_index(batch_op.f("ix_workflowstatus_run_id"), ["run_id"], unique=False) + batch_op.create_index(batch_op.f("ix_workflowstatus_workflow_id"), ["workflow_id"], unique=False) + + +def downgrade() -> None: + with op.batch_alter_table("workflowstatus", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_workflowstatus_workflow_id")) + batch_op.drop_index(batch_op.f("ix_workflowstatus_run_id")) + batch_op.drop_index(batch_op.f("ix_workflowstatus_project_name")) + + op.drop_table("workflowstatus") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
