This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch arm
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/arm by this push:
     new aab1b906 Add daily maintenance task
aab1b906 is described below

commit aab1b9069561bdccd8a6494e1717e84097b02a29
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Apr 7 10:22:41 2026 +0100

    Add daily maintenance task
---
 atr/models/results.py     |  7 ++++++
 atr/models/sql.py         |  1 +
 atr/server.py             | 11 +++++----
 atr/tasks/__init__.py     | 50 +++++++++++++++++++++++++++++++++++---
 atr/tasks/distribution.py |  9 +------
 atr/tasks/gha.py          | 13 +---------
 atr/tasks/maintenance.py  | 62 +++++++++++++++++++++++++++++++++++++++++++++++
 atr/tasks/metadata.py     | 10 +-------
 8 files changed, 126 insertions(+), 37 deletions(-)

diff --git a/atr/models/results.py b/atr/models/results.py
index cd04e226..bc1a205a 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -235,6 +235,12 @@ class VoteInitiate(schema.Strict):
     mail_send_warnings: list[str] = schema.description("Warnings from the mail 
server")
 
 
+class Maintenance(schema.Strict):
+    """Result of the task to run scheduled maintenance."""
+
+    kind: Literal["maintenance"] = schema.Field(alias="kind")
+
+
 class MetadataUpdate(schema.Strict):
     """Result of the task to update metadata from Whimsy."""
 
@@ -248,6 +254,7 @@ Results = Annotated[
     | DistributionWorkflow
     | DistributionWorkflowStatus
     | HashingCheck
+    | Maintenance
     | MessageSend
     | MetadataUpdate
     | SBOMAugment
diff --git a/atr/models/sql.py b/atr/models/sql.py
index 6346743a..e9479431 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -208,6 +208,7 @@ class TaskType(enum.StrEnum):
     KEYS_IMPORT_FILE = "keys_import_file"
     LICENSE_FILES = "license_files"
     LICENSE_HEADERS = "license_headers"
+    MAINTENANCE = "maintenance"
     MESSAGE_SEND = "message_send"
     METADATA_UPDATE = "metadata_update"
     PATHS_CHECK = "paths_check"
diff --git a/atr/server.py b/atr/server.py
index 6def8951..8a65aff6 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -622,9 +622,6 @@ def _create_app(app_config: type[config.AppConfig]) -> 
base.QuartApp:
     _app_setup_security_headers(app)
     _app_setup_request_lifecycle(app)
     _app_setup_lifecycle(app, app_config)
-
-    # _register_recurrent_tasks()
-
     # do not enable template pre-loading if we explicitly want to reload 
templates
     if not app_config.TEMPLATES_AUTO_RELOAD:
         preload.setup_template_preloading(app)
@@ -908,10 +905,14 @@ def _pending_migrations(state_dir: pathlib.Path) -> 
set[tuple[str, str]]:
 
 async def _register_recurrent_tasks() -> None:
     """Schedule recurring tasks"""
-    # Start scheduled tasks 5 min after server start
+    await tasks.clear_scheduled()
+    # Run maintenance task immediately on server startup
+    maintenance = await tasks.run_maintenance(asf_uid="system", 
schedule_next=True)
+    log.info(f"Scheduled maintenance with ID {maintenance.id}")
+    # Start other tasks 5 min after server start
     await asyncio.sleep(300)
     try:
-        await tasks.clear_scheduled()
+        await asyncio.sleep(60)
         metadata = await tasks.metadata_update(asf_uid="system", 
schedule_next=True)
         log.info(f"Scheduled remote metadata update with ID {metadata.id}")
         await asyncio.sleep(60)
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 6c4327cf..71d16967 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -27,6 +27,7 @@ import sqlmodel
 import atr.attestable as attestable
 import atr.db as db
 import atr.hashes as hashes
+import atr.log as log
 import atr.models.results as results
 import atr.models.safe as safe
 import atr.models.sql as sql
@@ -43,6 +44,7 @@ import atr.tasks.checks.zipformat as zipformat
 import atr.tasks.distribution as distribution
 import atr.tasks.gha as gha
 import atr.tasks.keys as keys
+import atr.tasks.maintenance as maintenance
 import atr.tasks.message as message
 import atr.tasks.metadata as metadata
 import atr.tasks.quarantine as quarantine
@@ -51,6 +53,9 @@ import atr.tasks.svn as svn
 import atr.tasks.vote as vote
 import atr.util as util
 
+_EVERY_2_MINUTES = 60 * 2
+_DAILY = 60 * 60 * 24
+
 
 async def asc_checks(
     asf_uid: str, release: sql.Release, revision: safe.RevisionNumber, 
signature_path: str, data: db.Session
@@ -112,7 +117,7 @@ async def distribution_status_check(
     """Queue a workflow status update task."""
     args = distribution.DistributionStatusCheckArgs(next_schedule_seconds=0, 
asf_uid=asf_uid)
     if schedule_next:
-        args.next_schedule_seconds = 2 * 60
+        args.next_schedule_seconds = _EVERY_2_MINUTES
     async with db.ensure_session(caller_data) as data:
         task = sql.Task(
             status=sql.TaskStatus.QUEUED,
@@ -231,6 +236,35 @@ async def keys_import_file(
         await data.commit()
 
 
+async def run_maintenance(
+    asf_uid: str,
+    caller_data: db.Session | None = None,
+    schedule: datetime.datetime | None = None,
+    schedule_next: bool = False,
+) -> sql.Task:
+    """Queue a maintenance task."""
+    args = maintenance.MaintenanceArgs(asf_uid=asf_uid, 
next_schedule_seconds=0)
+    if schedule_next:
+        args.next_schedule_seconds = _DAILY
+    async with db.ensure_session(caller_data) as data:
+        task = sql.Task(
+            status=sql.TaskStatus.QUEUED,
+            task_type=sql.TaskType.MAINTENANCE,
+            task_args=args.model_dump(),
+            asf_uid=asf_uid,
+            revision_number=None,
+            primary_rel_path=None,
+            project_key=None,
+            version_key=None,
+        )
+        if schedule:
+            task.scheduled = schedule
+        data.add(task)
+        await data.commit()
+        await data.flush()
+        return task
+
+
 async def metadata_update(
     asf_uid: str,
     caller_data: db.Session | None = None,
@@ -240,7 +274,7 @@ async def metadata_update(
     """Queue a metadata update task."""
     args = metadata.Update(asf_uid=asf_uid, next_schedule_seconds=0)
     if schedule_next:
-        args.next_schedule_seconds = 60 * 60 * 24
+        args.next_schedule_seconds = _DAILY
     async with db.ensure_session(caller_data) as data:
         task = sql.Task(
             status=sql.TaskStatus.QUEUED,
@@ -310,6 +344,8 @@ def resolve(task_type: sql.TaskType) -> Callable[..., 
Awaitable[results.Results
             return license.files
         case sql.TaskType.LICENSE_HEADERS:
             return license.headers
+        case sql.TaskType.MAINTENANCE:
+            return maintenance.run
         case sql.TaskType.MESSAGE_SEND:
             return message.send
         case sql.TaskType.METADATA_UPDATE:
@@ -380,6 +416,14 @@ async def sha_checks(
     return await asyncio.gather(*tasks)
 
 
+async def schedule_next(asf_uid: str, seconds: int, task: Callable[..., 
Awaitable[sql.Task]]) -> None:
+    """Schedule the next run of a recurring task."""
+    if seconds > 0:
+        next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(seconds=seconds)
+        await task(asf_uid, schedule=next_schedule, schedule_next=True)
+        log.info(f"Scheduled next run for: {next_schedule.strftime('%Y-%m-%d 
%H:%M:%S')}")
+
+
 async def tar_gz_checks(
     asf_uid: str, release: sql.Release, revision: safe.RevisionNumber, path: 
str, data: db.Session
 ) -> list[sql.Task | None]:
@@ -459,7 +503,7 @@ async def workflow_update(
     """Queue a workflow status update task."""
     args = gha.WorkflowStatusCheck(next_schedule_seconds=0, run_id=0, 
asf_uid=asf_uid)
     if schedule_next:
-        args.next_schedule_seconds = 2 * 60
+        args.next_schedule_seconds = _EVERY_2_MINUTES
     async with db.ensure_session(caller_data) as data:
         task = sql.Task(
             status=sql.TaskStatus.QUEUED,
diff --git a/atr/tasks/distribution.py b/atr/tasks/distribution.py
index 6d27d80b..d3742690 100644
--- a/atr/tasks/distribution.py
+++ b/atr/tasks/distribution.py
@@ -14,8 +14,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import datetime
-
 import pydantic
 
 import atr.db as db
@@ -70,12 +68,7 @@ async def status_check(args: DistributionStatusCheckArgs, *, 
task_id: int | None
         except (distribution.DistributionError, storage.AccessError) as e:
             msg = f"Failed to record distribution: {e}"
             log.error(msg)
-    if args.next_schedule_seconds:
-        next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(seconds=args.next_schedule_seconds)
-        await tasks.distribution_status_check(args.asf_uid, 
schedule=next_schedule, schedule_next=True)
-        log.info(
-            f"Scheduled next workflow status update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
-        )
+    await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds, 
tasks.distribution_status_check)
     return results.DistributionStatusCheck(
         kind="distribution_status",
     )
diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py
index f3d32796..660040b4 100644
--- a/atr/tasks/gha.py
+++ b/atr/tasks/gha.py
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 import asyncio
-import datetime
 import json
 import uuid
 from collections.abc import Callable
@@ -125,8 +124,7 @@ async def status_check(args: WorkflowStatusCheck) -> 
results.DistributionWorkflo
             f"Workflow status update completed: updated {updated_count} 
workflow(s)",
         )
 
-        # Schedule next update
-        await _schedule_next(args)
+        await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds, 
tasks.workflow_update)
 
         return results.DistributionWorkflowStatus(
             kind="distribution_workflow_status",
@@ -249,15 +247,6 @@ async def _request_and_retry(
     return None
 
 
-async def _schedule_next(args: WorkflowStatusCheck) -> None:
-    if args.next_schedule_seconds:
-        next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(seconds=args.next_schedule_seconds)
-        await tasks.workflow_update(args.asf_uid, schedule=next_schedule, 
schedule_next=True)
-        log.info(
-            f"Scheduled next workflow status update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
-        )
-
-
 #
 # async def _wait_for_completion(
 #     session: aiohttp.ClientSession,
diff --git a/atr/tasks/maintenance.py b/atr/tasks/maintenance.py
new file mode 100644
index 00000000..171c9a76
--- /dev/null
+++ b/atr/tasks/maintenance.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pydantic
+
+import atr.log as log
+import atr.models.results as results
+import atr.models.schema as schema
+import atr.tasks as tasks
+import atr.tasks.checks as checks
+
+
+class MaintenanceArgs(schema.Strict):
+    """Arguments for the task to perform scheduled maintenance."""
+
+    asf_uid: str = schema.description("The ASF UID of the user triggering the 
maintenance")
+    next_schedule_seconds: int = pydantic.Field(default=0, description="The 
next scheduled time")
+
+
+class MaintenanceError(Exception):
+    pass
+
+
[email protected]_model(MaintenanceArgs)
+async def run(args: MaintenanceArgs) -> results.Results | None:
+    """Run maintenance."""
+    log.info(f"Starting maintenance (user: {args.asf_uid})")
+
+    try:
+        await _storage_maintenance()
+
+        log.info(
+            "Storage maintenance completed successfully",
+        )
+
+        await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds, 
tasks.run_maintenance)
+
+        return results.Maintenance(
+            kind="maintenance",
+        )
+    except Exception as e:
+        error_msg = f"Unexpected error during maintenance: {e!s}"
+        log.exception("Maintenance failed with unexpected error")
+        raise MaintenanceError(error_msg) from e
+
+
+async def _storage_maintenance():
+    pass
diff --git a/atr/tasks/metadata.py b/atr/tasks/metadata.py
index 5de4b77e..1edf8fe1 100644
--- a/atr/tasks/metadata.py
+++ b/atr/tasks/metadata.py
@@ -15,8 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import datetime
-
 import aiohttp
 import pydantic
 
@@ -51,13 +49,7 @@ async def update(args: Update) -> results.Results | None:
             f"Metadata update completed successfully: added {added_count}, 
updated {updated_count}",
         )
 
-        # Schedule next update
-        if args.next_schedule_seconds and (args.next_schedule_seconds > 0):
-            next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(seconds=args.next_schedule_seconds)
-            await tasks.metadata_update(args.asf_uid, schedule=next_schedule, 
schedule_next=True)
-            log.info(
-                f"Scheduled next metadata update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
-            )
+        await tasks.schedule_next(args.asf_uid, args.next_schedule_seconds, 
tasks.metadata_update)
 
         return results.MetadataUpdate(
             kind="metadata_update",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to