This is an automated email from the ASF dual-hosted git repository.
arm pushed a commit to branch arm
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/arm by this push:
new aab1b906 Add daily maintenance task
aab1b906 is described below
commit aab1b9069561bdccd8a6494e1717e84097b02a29
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Apr 7 10:22:41 2026 +0100
Add daily maintenance task
---
atr/models/results.py | 7 ++++++
atr/models/sql.py | 1 +
atr/server.py | 11 +++++----
atr/tasks/__init__.py | 50 +++++++++++++++++++++++++++++++++++---
atr/tasks/distribution.py | 9 +------
atr/tasks/gha.py | 13 +---------
atr/tasks/maintenance.py | 62 +++++++++++++++++++++++++++++++++++++++++++++++
atr/tasks/metadata.py | 10 +-------
8 files changed, 126 insertions(+), 37 deletions(-)
diff --git a/atr/models/results.py b/atr/models/results.py
index cd04e226..bc1a205a 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -235,6 +235,12 @@ class VoteInitiate(schema.Strict):
mail_send_warnings: list[str] = schema.description("Warnings from the mail
server")
+class Maintenance(schema.Strict):
+ """Result of the task to run scheduled maintenance."""
+
+ kind: Literal["maintenance"] = schema.Field(alias="kind")
+
+
class MetadataUpdate(schema.Strict):
"""Result of the task to update metadata from Whimsy."""
@@ -248,6 +254,7 @@ Results = Annotated[
| DistributionWorkflow
| DistributionWorkflowStatus
| HashingCheck
+ | Maintenance
| MessageSend
| MetadataUpdate
| SBOMAugment
diff --git a/atr/models/sql.py b/atr/models/sql.py
index 6346743a..e9479431 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -208,6 +208,7 @@ class TaskType(enum.StrEnum):
KEYS_IMPORT_FILE = "keys_import_file"
LICENSE_FILES = "license_files"
LICENSE_HEADERS = "license_headers"
+ MAINTENANCE = "maintenance"
MESSAGE_SEND = "message_send"
METADATA_UPDATE = "metadata_update"
PATHS_CHECK = "paths_check"
diff --git a/atr/server.py b/atr/server.py
index 6def8951..8a65aff6 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -622,9 +622,6 @@ def _create_app(app_config: type[config.AppConfig]) ->
base.QuartApp:
_app_setup_security_headers(app)
_app_setup_request_lifecycle(app)
_app_setup_lifecycle(app, app_config)
-
- # _register_recurrent_tasks()
-
# do not enable template pre-loading if we explicitly want to reload
templates
if not app_config.TEMPLATES_AUTO_RELOAD:
preload.setup_template_preloading(app)
@@ -908,10 +905,14 @@ def _pending_migrations(state_dir: pathlib.Path) ->
set[tuple[str, str]]:
async def _register_recurrent_tasks() -> None:
"""Schedule recurring tasks"""
- # Start scheduled tasks 5 min after server start
+ await tasks.clear_scheduled()
+ # Run maintenance task immediately on server startup
+ maintenance = await tasks.run_maintenance(asf_uid="system",
schedule_next=True)
+ log.info(f"Scheduled maintenance with ID {maintenance.id}")
+ # Start other tasks 5 min after server start
await asyncio.sleep(300)
try:
- await tasks.clear_scheduled()
+ await asyncio.sleep(60)
metadata = await tasks.metadata_update(asf_uid="system",
schedule_next=True)
log.info(f"Scheduled remote metadata update with ID {metadata.id}")
await asyncio.sleep(60)
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 6c4327cf..71d16967 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -27,6 +27,7 @@ import sqlmodel
import atr.attestable as attestable
import atr.db as db
import atr.hashes as hashes
+import atr.log as log
import atr.models.results as results
import atr.models.safe as safe
import atr.models.sql as sql
@@ -43,6 +44,7 @@ import atr.tasks.checks.zipformat as zipformat
import atr.tasks.distribution as distribution
import atr.tasks.gha as gha
import atr.tasks.keys as keys
+import atr.tasks.maintenance as maintenance
import atr.tasks.message as message
import atr.tasks.metadata as metadata
import atr.tasks.quarantine as quarantine
@@ -51,6 +53,9 @@ import atr.tasks.svn as svn
import atr.tasks.vote as vote
import atr.util as util
+_EVERY_2_MINUTES = 60 * 2
+_DAILY = 60 * 60 * 24
+
async def asc_checks(
asf_uid: str, release: sql.Release, revision: safe.RevisionNumber,
signature_path: str, data: db.Session
@@ -112,7 +117,7 @@ async def distribution_status_check(
"""Queue a workflow status update task."""
args = distribution.DistributionStatusCheckArgs(next_schedule_seconds=0,
asf_uid=asf_uid)
if schedule_next:
- args.next_schedule_seconds = 2 * 60
+ args.next_schedule_seconds = _EVERY_2_MINUTES
async with db.ensure_session(caller_data) as data:
task = sql.Task(
status=sql.TaskStatus.QUEUED,
@@ -231,6 +236,35 @@ async def keys_import_file(
await data.commit()
+async def run_maintenance(
+ asf_uid: str,
+ caller_data: db.Session | None = None,
+ schedule: datetime.datetime | None = None,
+ schedule_next: bool = False,
+) -> sql.Task:
+ """Queue a maintenance task."""
+ args = maintenance.MaintenanceArgs(asf_uid=asf_uid,
next_schedule_seconds=0)
+ if schedule_next:
+ args.next_schedule_seconds = _DAILY
+ async with db.ensure_session(caller_data) as data:
+ task = sql.Task(
+ status=sql.TaskStatus.QUEUED,
+ task_type=sql.TaskType.MAINTENANCE,
+ task_args=args.model_dump(),
+ asf_uid=asf_uid,
+ revision_number=None,
+ primary_rel_path=None,
+ project_key=None,
+ version_key=None,
+ )
+ if schedule:
+ task.scheduled = schedule
+ data.add(task)
+ await data.commit()
+ await data.flush()
+ return task
+
+
async def metadata_update(
asf_uid: str,
caller_data: db.Session | None = None,
@@ -240,7 +274,7 @@ async def metadata_update(
"""Queue a metadata update task."""
args = metadata.Update(asf_uid=asf_uid, next_schedule_seconds=0)
if schedule_next:
- args.next_schedule_seconds = 60 * 60 * 24
+ args.next_schedule_seconds = _DAILY
async with db.ensure_session(caller_data) as data:
task = sql.Task(
status=sql.TaskStatus.QUEUED,
@@ -310,6 +344,8 @@ def resolve(task_type: sql.TaskType) -> Callable[...,
Awaitable[results.Results
return license.files
case sql.TaskType.LICENSE_HEADERS:
return license.headers
+ case sql.TaskType.MAINTENANCE:
+ return maintenance.run
case sql.TaskType.MESSAGE_SEND:
return message.send
case sql.TaskType.METADATA_UPDATE:
@@ -380,6 +416,14 @@ async def sha_checks(
return await asyncio.gather(*tasks)
+async def schedule_next(asf_uid: str, seconds: int, task: Callable[...,
Awaitable[sql.Task]]) -> None:
+ """Schedule the next run of a recurring task."""
+ if seconds > 0:
+ next_schedule = datetime.datetime.now(datetime.UTC) +
datetime.timedelta(seconds=seconds)
+ await task(asf_uid, schedule=next_schedule, schedule_next=True)
+ log.info(f"Scheduled next run for: {next_schedule.strftime('%Y-%m-%d
%H:%M:%S')}")
+
+
async def tar_gz_checks(
asf_uid: str, release: sql.Release, revision: safe.RevisionNumber, path:
str, data: db.Session
) -> list[sql.Task | None]:
@@ -459,7 +503,7 @@ async def workflow_update(
"""Queue a workflow status update task."""
args = gha.WorkflowStatusCheck(next_schedule_seconds=0, run_id=0,
asf_uid=asf_uid)
if schedule_next:
- args.next_schedule_seconds = 2 * 60
+ args.next_schedule_seconds = _EVERY_2_MINUTES
async with db.ensure_session(caller_data) as data:
task = sql.Task(
status=sql.TaskStatus.QUEUED,
diff --git a/atr/tasks/distribution.py b/atr/tasks/distribution.py
index 6d27d80b..d3742690 100644
--- a/atr/tasks/distribution.py
+++ b/atr/tasks/distribution.py
@@ -14,8 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import datetime
-
import pydantic
import atr.db as db
@@ -70,12 +68,7 @@ async def status_check(args: DistributionStatusCheckArgs, *,
task_id: int | None
except (distribution.DistributionError, storage.AccessError) as e:
msg = f"Failed to record distribution: {e}"
log.error(msg)
- if args.next_schedule_seconds:
- next_schedule = datetime.datetime.now(datetime.UTC) +
datetime.timedelta(seconds=args.next_schedule_seconds)
- await tasks.distribution_status_check(args.asf_uid,
schedule=next_schedule, schedule_next=True)
- log.info(
- f"Scheduled next workflow status update for:
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
- )
+ await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds,
tasks.distribution_status_check)
return results.DistributionStatusCheck(
kind="distribution_status",
)
diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py
index f3d32796..660040b4 100644
--- a/atr/tasks/gha.py
+++ b/atr/tasks/gha.py
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
import asyncio
-import datetime
import json
import uuid
from collections.abc import Callable
@@ -125,8 +124,7 @@ async def status_check(args: WorkflowStatusCheck) ->
results.DistributionWorkflo
f"Workflow status update completed: updated {updated_count}
workflow(s)",
)
- # Schedule next update
- await _schedule_next(args)
+ await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds,
tasks.workflow_update)
return results.DistributionWorkflowStatus(
kind="distribution_workflow_status",
@@ -249,15 +247,6 @@ async def _request_and_retry(
return None
-async def _schedule_next(args: WorkflowStatusCheck) -> None:
- if args.next_schedule_seconds:
- next_schedule = datetime.datetime.now(datetime.UTC) +
datetime.timedelta(seconds=args.next_schedule_seconds)
- await tasks.workflow_update(args.asf_uid, schedule=next_schedule,
schedule_next=True)
- log.info(
- f"Scheduled next workflow status update for:
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
- )
-
-
#
# async def _wait_for_completion(
# session: aiohttp.ClientSession,
diff --git a/atr/tasks/maintenance.py b/atr/tasks/maintenance.py
new file mode 100644
index 00000000..171c9a76
--- /dev/null
+++ b/atr/tasks/maintenance.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pydantic
+
+import atr.log as log
+import atr.models.results as results
+import atr.models.schema as schema
+import atr.tasks as tasks
+import atr.tasks.checks as checks
+
+
+class MaintenanceArgs(schema.Strict):
+ """Arguments for the task to perform scheduled maintenance."""
+
+ asf_uid: str = schema.description("The ASF UID of the user triggering the
maintenance")
+ next_schedule_seconds: int = pydantic.Field(default=0, description="The
next scheduled time")
+
+
+class MaintenanceError(Exception):
+ pass
+
+
[email protected]_model(MaintenanceArgs)
+async def run(args: MaintenanceArgs) -> results.Results | None:
+ """Run maintenance."""
+ log.info(f"Starting maintenance (user: {args.asf_uid})")
+
+ try:
+ await _storage_maintenance()
+
+ log.info(
+ "Storage maintenance completed successfully",
+ )
+
+ await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds,
tasks.run_maintenance)
+
+ return results.Maintenance(
+ kind="maintenance",
+ )
+ except Exception as e:
+ error_msg = f"Unexpected error during maintenance: {e!s}"
+ log.exception("Maintenance failed with unexpected error")
+ raise MaintenanceError(error_msg) from e
+
+
+async def _storage_maintenance():
+ pass
diff --git a/atr/tasks/metadata.py b/atr/tasks/metadata.py
index 5de4b77e..1edf8fe1 100644
--- a/atr/tasks/metadata.py
+++ b/atr/tasks/metadata.py
@@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-import datetime
-
import aiohttp
import pydantic
@@ -51,13 +49,7 @@ async def update(args: Update) -> results.Results | None:
f"Metadata update completed successfully: added {added_count},
updated {updated_count}",
)
- # Schedule next update
- if args.next_schedule_seconds and (args.next_schedule_seconds > 0):
- next_schedule = datetime.datetime.now(datetime.UTC) +
datetime.timedelta(seconds=args.next_schedule_seconds)
- await tasks.metadata_update(args.asf_uid, schedule=next_schedule,
schedule_next=True)
- log.info(
- f"Scheduled next metadata update for:
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
- )
+ await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds,
tasks.metadata_update)
return results.MetadataUpdate(
kind="metadata_update",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]