This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 34ae0a19e0fc5f4d575a6df9e871656754590ceb Author: Alastair McFarlane <[email protected]> AuthorDate: Thu Jan 15 11:33:03 2026 +0000 Add the ability to schedule future jobs. Worker claim updated to not claim future tasks. Metadata and workflow status tasks can schedule themselves. --- atr/db/__init__.py | 7 ++++ atr/models/results.py | 7 ++++ atr/models/sql.py | 1 + atr/server.py | 61 +++++++++++++++++++++------------- atr/tasks/__init__.py | 67 +++++++++++++++++++++++++++++++++++-- atr/tasks/gha.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++ atr/tasks/metadata.py | 14 ++++++++ atr/worker.py | 7 +++- 8 files changed, 230 insertions(+), 25 deletions(-) diff --git a/atr/db/__init__.py b/atr/db/__init__.py index 050f307..8250a90 100644 --- a/atr/db/__init__.py +++ b/atr/db/__init__.py @@ -755,7 +755,10 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession): run_id: Opt[int] = NOT_SET, project_name: Opt[str] = NOT_SET, task_id: Opt[int] = NOT_SET, + status: Opt[str] = NOT_SET, + status_in: Opt[list[str]] = NOT_SET, ) -> Query[sql.WorkflowStatus]: + via = sql.validate_instrumented_attribute query = sqlmodel.select(sql.WorkflowStatus) if is_defined(workflow_id): @@ -766,6 +769,10 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession): query = query.where(sql.WorkflowStatus.project_name == project_name) if is_defined(task_id): query = query.where(sql.WorkflowStatus.task_id == task_id) + if is_defined(status): + query = query.where(sql.WorkflowStatus.status == status) + if is_defined(status_in): + query = query.where(via(sql.WorkflowStatus.status).in_(status_in)) return Query(self, query) diff --git a/atr/models/results.py b/atr/models/results.py index 5dc1ce2..6893f28 100644 --- a/atr/models/results.py +++ b/atr/models/results.py @@ -33,6 +33,12 @@ class DistributionWorkflow(schema.Strict): url: str = schema.description("The URL of the workflow run") +class DistributionWorkflowStatus(schema.Strict): + """Result of the task to update Github workflow statuses.""" + + kind: Literal["distribution_workflow_status"] = schema.Field(alias="kind") + + class HashingCheck(schema.Strict): """Result of the task to check the hash of a file.""" @@ -194,6 +200,7 @@ class MetadataUpdate(schema.Strict): Results = Annotated[ DistributionWorkflow + | DistributionWorkflowStatus | HashingCheck | MessageSend | MetadataUpdate diff --git a/atr/models/sql.py b/atr/models/sql.py index 2a58dde..0a80912 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -208,6 +208,7 @@ class TaskType(str, enum.Enum): TARGZ_INTEGRITY = "targz_integrity" TARGZ_STRUCTURE = "targz_structure" VOTE_INITIATE = "vote_initiate" + WORKFLOW_STATUS = "workflow_status" ZIPFORMAT_INTEGRITY = "zipformat_integrity" ZIPFORMAT_STRUCTURE = "zipformat_structure" diff --git a/atr/server.py b/atr/server.py index 4c68373..d64f7d1 100644 --- a/atr/server.py +++ b/atr/server.py @@ -207,9 +207,8 @@ def _app_setup_lifecycle(app: base.QuartApp) -> None: worker_manager = manager.get_worker_manager() await worker_manager.start() - # Start the metadata update scheduler - metadata_scheduler_task = asyncio.create_task(_metadata_update_scheduler()) - app.extensions["metadata_scheduler"] = metadata_scheduler_task + # Register recurring tasks (metadata updates, workflow status checks, etc.) + await _register_recurrent_tasks() await _initialise_test_environment() @@ -251,13 +250,13 @@ def _app_setup_lifecycle(app: base.QuartApp) -> None: await worker_manager.stop() # Stop the metadata scheduler - metadata_scheduler = app.extensions.get("metadata_scheduler") - if metadata_scheduler: - metadata_scheduler.cancel() - try: - await metadata_scheduler - except asyncio.CancelledError: - ... + # metadata_scheduler = app.extensions.get("metadata_scheduler") + # if metadata_scheduler: + # metadata_scheduler.cancel() + # try: + # await metadata_scheduler + # except asyncio.CancelledError: + # ... ssh_server = app.extensions.get("ssh_server") if ssh_server: @@ -412,6 +411,8 @@ def _create_app(app_config: type[config.AppConfig]) -> base.QuartApp: _app_setup_security_headers(app) _app_setup_lifecycle(app) + # _register_recurrent_tasks() + # do not enable template pre-loading if we explicitly want to reload templates if not app_config.TEMPLATES_AUTO_RELOAD: preload.setup_template_preloading(app) @@ -513,20 +514,36 @@ async def _initialise_test_environment() -> None: await data.commit() -async def _metadata_update_scheduler() -> None: - """Periodically schedule remote metadata updates.""" - # Wait one minute to allow the server to start - await asyncio.sleep(60) +# +# async def _metadata_update_scheduler() -> None: +# """Periodically schedule remote metadata updates.""" +# # Wait one minute to allow the server to start +# await asyncio.sleep(60) +# +# while True: +# try: +# task = await tasks.metadata_update(asf_uid="system") +# log.info(f"Scheduled remote metadata update with ID {task.id}") +# except Exception as e: +# log.exception(f"Failed to schedule remote metadata update: {e!s}") +# +# # Schedule next update in 24 hours +# await asyncio.sleep(86400) - while True: - try: - task = await tasks.metadata_update(asf_uid="system") - log.info(f"Scheduled remote metadata update with ID {task.id}") - except Exception as e: - log.exception(f"Failed to schedule remote metadata update: {e!s}") - # Schedule next update in 24 hours - await asyncio.sleep(86400) +async def _register_recurrent_tasks() -> None: + """Schedule recurring tasks""" + # Wait one minute to allow the server to start + await asyncio.sleep(30) + try: + await tasks.clear_scheduled() + metadata = await tasks.metadata_update(asf_uid="system", schedule_next=True) + log.info(f"Scheduled remote metadata update with ID {metadata.id}") + workflow = await tasks.workflow_update(asf_uid="system", schedule_next=True) + log.info(f"Scheduled workflow status update with ID {workflow.id}") + + except Exception as e: + log.exception(f"Failed to schedule recurrent tasks: {e!s}") def _migrate_audit(state_dir: pathlib.Path) -> None: diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index 9d89e31..99f2328 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -15,9 +15,12 @@ # specific language governing permissions and limitations # under the License. +import datetime from collections.abc import Awaitable, Callable, Coroutine from typing import Any, Final +import sqlmodel + import atr.db as db import atr.models.results as results import atr.models.sql as sql @@ -57,6 +60,27 @@ async def asc_checks(asf_uid: str, release: sql.Release, revision: str, signatur return tasks +async def clear_scheduled(caller_data: db.Session | None = None): + """Clear all future scheduled tasks of the given types.""" + async with db.ensure_session(caller_data) as data: + via = sql.validate_instrumented_attribute + now = datetime.datetime.now(datetime.UTC) + + delete_stmt = sqlmodel.delete(sql.Task).where( + via(sql.Task.task_type).in_( + [ + sql.TaskType.METADATA_UPDATE, + sql.TaskType.WORKFLOW_STATUS, + ] + ), + via(sql.Task.status) == sql.TaskStatus.QUEUED, + via(sql.Task.added) > now, + ) + + await data.execute(delete_stmt) + await data.commit() + + async def draft_checks( asf_uid: str, project_name: str, release_version: str, revision_number: str, caller_data: db.Session | None = None ) -> int: @@ -150,17 +174,27 @@ async def keys_import_file( await data.commit() -async def metadata_update(asf_uid: str, caller_data: db.Session | None = None) -> sql.Task: +async def metadata_update( + asf_uid: str, + caller_data: db.Session | None = None, + schedule: datetime.datetime | None = None, + schedule_next: bool = False, +) -> sql.Task: """Queue a metadata update task.""" + args = metadata.Update(asf_uid=asf_uid, next_schedule=0) + if schedule_next: + args.next_schedule = 60 * 24 async with db.ensure_session(caller_data) as data: task = sql.Task( status=sql.TaskStatus.QUEUED, task_type=sql.TaskType.METADATA_UPDATE, - task_args=metadata.Update(asf_uid=asf_uid).model_dump(), + task_args=args.model_dump(), asf_uid=asf_uid, revision_number=None, primary_rel_path=None, ) + if schedule: + task.added = schedule data.add(task) await data.commit() await data.flush() @@ -227,6 +261,8 @@ def resolve(task_type: sql.TaskType) -> Callable[..., Awaitable[results.Results return targz.structure case sql.TaskType.VOTE_INITIATE: return vote.initiate + case sql.TaskType.WORKFLOW_STATUS: + return gha.status_check case sql.TaskType.ZIPFORMAT_INTEGRITY: return zipformat.integrity case sql.TaskType.ZIPFORMAT_STRUCTURE: @@ -259,6 +295,33 @@ async def tar_gz_checks(asf_uid: str, release: sql.Release, revision: str, path: return tasks +async def workflow_update( + asf_uid: str, + caller_data: db.Session | None = None, + schedule: datetime.datetime | None = None, + schedule_next: bool = False, +) -> sql.Task: + """Queue a workflow status update task.""" + args = gha.WorkflowStatusCheck(next_schedule=0, run_id=0) + if schedule_next: + args.next_schedule = 2 + async with db.ensure_session(caller_data) as data: + task = sql.Task( + status=sql.TaskStatus.QUEUED, + task_type=sql.TaskType.WORKFLOW_STATUS, + task_args=args.model_dump(), + asf_uid=asf_uid, + revision_number=None, + primary_rel_path=None, + ) + if schedule: + task.added = schedule + data.add(task) + await data.commit() + await data.flush() + return task + + async def zip_checks(asf_uid: str, release: sql.Release, revision: str, path: str) -> list[sql.Task]: """Create check tasks for a .zip file.""" # This release has committee, as guaranteed in draft_checks diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py index 5e174c7..9ccaef8 100644 --- a/atr/tasks/gha.py +++ b/atr/tasks/gha.py @@ -15,14 +15,17 @@ # specific language governing permissions and limitations # under the License. import asyncio +import datetime import json import uuid from collections.abc import Callable from typing import Any, Final, NoReturn import aiohttp +import pydantic import atr.config as config +import atr.db as db import atr.log as log import atr.models.results as results import atr.models.schema as schema @@ -30,7 +33,9 @@ import atr.models.sql as sql # import atr.shared as shared import atr.storage as storage +import atr.tasks as tasks import atr.tasks.checks as checks +from atr.models.results import DistributionWorkflowStatus _BASE_URL: Final[str] = "https://api.github.com/repos" _IN_PROGRESS_STATUSES: Final[list[str]] = ["in_progress", "queued", "requested", "waiting", "pending", "expected"] @@ -56,6 +61,11 @@ class DistributionWorkflow(schema.Strict): name: str = schema.description("Name of the run") +class WorkflowStatusCheck(schema.Strict): + run_id: int | None = schema.description("Run ID") + next_schedule: int = pydantic.Field(default=0, description="The next scheduled time (in minutes)") + + @checks.with_model(DistributionWorkflow) async def trigger_workflow(args: DistributionWorkflow, *, task_id: int | None = None) -> results.Results | None: unique_id = f"atr-dist-{args.name}-{uuid.uuid4()}" @@ -112,6 +122,78 @@ async def trigger_workflow(args: DistributionWorkflow, *, task_id: int | None = ) [email protected]_model(WorkflowStatusCheck) +async def status_check(args: WorkflowStatusCheck) -> DistributionWorkflowStatus: + """Check remote workflow statuses.""" + + headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {config.get().GITHUB_TOKEN}"} + log.info("Updating Github workflow statuses from apache/tooling-actions") + runs = [] + try: + async with aiohttp.ClientSession() as session: + try: + async with session.get( + f"{_BASE_URL}/apache/tooling-actions/actions/runs?event=workflow_dispatch", headers=headers + ) as response: + response.raise_for_status() + resp_json = await response.json() + runs = resp_json.get("workflow_runs", []) + except aiohttp.ClientResponseError as e: + _fail(f"Failed to lookup GitHub workflows: {e.message} ({e.status})") + + updated_count = 0 + + if len(runs) > 0: + async with db.session() as data: + pending_runs = await data.workflow_status(status_in=[*_IN_PROGRESS_STATUSES, ""]).all() + + for pending in pending_runs: + # Find matching workflow run from GitHub API + matching_run = next( + ( + r + for r in runs + if r.get("id") == pending.run_id and r.get("path", "").endswith(f"/{pending.workflow_id}") + ), + None, + ) + + if matching_run: + new_status = matching_run.get("status", "") + new_message = matching_run.get("conclusion") + if new_message == "failure": + new_status = "failed" + new_message = "GitHub workflow failed" + + # Update status if it has changed + if new_status != pending.status: + pending.status = new_status + if new_message: + pending.message = new_message + updated_count += 1 + log.info( + f"Updated workflow {pending.workflow_id} run {pending.run_id} to status {new_status}" + ) + # TODO: If we can't find this run ID in the bulk response, we could check it directly by ID in the API + await data.commit() + + log.info( + f"Workflow status update completed: updated {updated_count} workflow(s)", + ) + + # Schedule next update + await _schedule_next(args) + + return results.DistributionWorkflowStatus( + kind="distribution_workflow_status", + ) + + except aiohttp.ClientError as e: + _fail(f"Failed to fetch workflow data from GitHub: {e!s}") + except Exception as e: + _fail(f"Unexpected error during workflow status update: {e!s}") + + def _fail(message: str) -> NoReturn: log.error(message) raise RuntimeError(message) @@ -192,6 +274,15 @@ async def _request_and_retry( return None +async def _schedule_next(args: WorkflowStatusCheck): + if args.next_schedule: + next_schedule = datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=args.next_schedule) + await tasks.workflow_update("system", schedule=next_schedule, schedule_next=True) + log.info( + f"Scheduled next workflow status update for: {next_schedule.strftime('%Y-%m-%d %H:%M:%S')}", + ) + + # # async def _wait_for_completion( # session: aiohttp.ClientSession, diff --git a/atr/tasks/metadata.py b/atr/tasks/metadata.py index 561afeb..ba2b6e5 100644 --- a/atr/tasks/metadata.py +++ b/atr/tasks/metadata.py @@ -15,12 +15,16 @@ # specific language governing permissions and limitations # under the License. +import datetime + import aiohttp +import pydantic import atr.datasources.apache as apache import atr.log as log import atr.models.results as results import atr.models.schema as schema +import atr.tasks as tasks import atr.tasks.checks as checks @@ -28,6 +32,7 @@ class Update(schema.Strict): """Arguments for the task to update metadata from remote data sources.""" asf_uid: str = schema.description("The ASF UID of the user triggering the update") + next_schedule: int = pydantic.Field(default=0, description="The next scheduled time (in minutes)") class UpdateError(Exception): @@ -45,6 +50,15 @@ async def update(args: Update) -> results.Results | None: log.info( f"Metadata update completed successfully: added {added_count}, updated {updated_count}", ) + + # Schedule next update + if args.next_schedule and args.next_schedule > 0: + next_schedule = datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=args.next_schedule) + await tasks.metadata_update(args.asf_uid, schedule=next_schedule, schedule_next=True) + log.info( + f"Scheduled next metadata update for: {next_schedule.strftime('%Y-%m-%d %H:%M:%S')}", + ) + return results.MetadataUpdate( kind="metadata_update", added_count=added_count, diff --git a/atr/worker.py b/atr/worker.py index 79e2823..db3a9ac 100644 --- a/atr/worker.py +++ b/atr/worker.py @@ -114,7 +114,12 @@ async def _task_next_claim() -> tuple[int, str, list[str] | dict[str, Any]] | No # Get the ID of the oldest queued task oldest_queued_task = ( sqlmodel.select(sql.Task.id) - .where(sql.Task.status == task.QUEUED) + .where( + sqlmodel.and_( + sql.Task.status == task.QUEUED, + sql.Task.added <= datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=2), + ) + ) .order_by(sql.validate_instrumented_attribute(sql.Task.added).asc()) .limit(1) ) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
