This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch scheduled_tasks
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 31d91cc540a164eccd662f2a6944e6f77005d933
Author: Alastair McFarlane <[email protected]>
AuthorDate: Thu Jan 15 11:33:03 2026 +0000

    Add the ability to schedule future jobs. Worker claim updated to not claim 
future tasks. Metadata and workflow status tasks can schedule themselves.
---
 atr/db/__init__.py    |  7 ++++
 atr/models/results.py |  7 ++++
 atr/models/sql.py     |  1 +
 atr/server.py         | 61 +++++++++++++++++++++-------------
 atr/tasks/__init__.py | 67 +++++++++++++++++++++++++++++++++++--
 atr/tasks/gha.py      | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++
 atr/tasks/metadata.py | 14 ++++++++
 atr/worker.py         |  7 +++-
 8 files changed, 230 insertions(+), 25 deletions(-)

diff --git a/atr/db/__init__.py b/atr/db/__init__.py
index 050f307..8250a90 100644
--- a/atr/db/__init__.py
+++ b/atr/db/__init__.py
@@ -755,7 +755,10 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession):
         run_id: Opt[int] = NOT_SET,
         project_name: Opt[str] = NOT_SET,
         task_id: Opt[int] = NOT_SET,
+        status: Opt[str] = NOT_SET,
+        status_in: Opt[list[str]] = NOT_SET,
     ) -> Query[sql.WorkflowStatus]:
+        via = sql.validate_instrumented_attribute
         query = sqlmodel.select(sql.WorkflowStatus)
 
         if is_defined(workflow_id):
@@ -766,6 +769,10 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession):
             query = query.where(sql.WorkflowStatus.project_name == 
project_name)
         if is_defined(task_id):
             query = query.where(sql.WorkflowStatus.task_id == task_id)
+        if is_defined(status):
+            query = query.where(sql.WorkflowStatus.status == status)
+        if is_defined(status_in):
+            query = query.where(via(sql.WorkflowStatus.status).in_(status_in))
 
         return Query(self, query)
 
diff --git a/atr/models/results.py b/atr/models/results.py
index 5dc1ce2..6893f28 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -33,6 +33,12 @@ class DistributionWorkflow(schema.Strict):
     url: str = schema.description("The URL of the workflow run")
 
 
+class DistributionWorkflowStatus(schema.Strict):
+    """Result of the task to update Github workflow statuses."""
+
+    kind: Literal["distribution_workflow_status"] = schema.Field(alias="kind")
+
+
 class HashingCheck(schema.Strict):
     """Result of the task to check the hash of a file."""
 
@@ -194,6 +200,7 @@ class MetadataUpdate(schema.Strict):
 
 Results = Annotated[
     DistributionWorkflow
+    | DistributionWorkflowStatus
     | HashingCheck
     | MessageSend
     | MetadataUpdate
diff --git a/atr/models/sql.py b/atr/models/sql.py
index 296c65d..43a3da3 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -207,6 +207,7 @@ class TaskType(str, enum.Enum):
     TARGZ_INTEGRITY = "targz_integrity"
     TARGZ_STRUCTURE = "targz_structure"
     VOTE_INITIATE = "vote_initiate"
+    WORKFLOW_STATUS = "workflow_status"
     ZIPFORMAT_INTEGRITY = "zipformat_integrity"
     ZIPFORMAT_STRUCTURE = "zipformat_structure"
 
diff --git a/atr/server.py b/atr/server.py
index 71fe523..f020d07 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -202,9 +202,8 @@ def _app_setup_lifecycle(app: base.QuartApp) -> None:
         worker_manager = manager.get_worker_manager()
         await worker_manager.start()
 
-        # Start the metadata update scheduler
-        metadata_scheduler_task = 
asyncio.create_task(_metadata_update_scheduler())
-        app.extensions["metadata_scheduler"] = metadata_scheduler_task
+        # Register recurring tasks (metadata updates, workflow status checks, 
etc.)
+        await _register_recurrent_tasks()
 
         await _initialise_test_environment()
 
@@ -246,13 +245,13 @@ def _app_setup_lifecycle(app: base.QuartApp) -> None:
         await worker_manager.stop()
 
         # Stop the metadata scheduler
-        metadata_scheduler = app.extensions.get("metadata_scheduler")
-        if metadata_scheduler:
-            metadata_scheduler.cancel()
-            try:
-                await metadata_scheduler
-            except asyncio.CancelledError:
-                ...
+        # metadata_scheduler = app.extensions.get("metadata_scheduler")
+        # if metadata_scheduler:
+        #     metadata_scheduler.cancel()
+        #     try:
+        #         await metadata_scheduler
+        #     except asyncio.CancelledError:
+        #         ...
 
         ssh_server = app.extensions.get("ssh_server")
         if ssh_server:
@@ -406,6 +405,8 @@ def _create_app(app_config: type[config.AppConfig]) -> 
base.QuartApp:
     _app_setup_security_headers(app)
     _app_setup_lifecycle(app)
 
+    # _register_recurrent_tasks()
+
     # do not enable template pre-loading if we explicitly want to reload 
templates
     if not app_config.TEMPLATES_AUTO_RELOAD:
         preload.setup_template_preloading(app)
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
             await data.commit()
 
 
-async def _metadata_update_scheduler() -> None:
-    """Periodically schedule remote metadata updates."""
-    # Wait one minute to allow the server to start
-    await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+#     """Periodically schedule remote metadata updates."""
+#     # Wait one minute to allow the server to start
+#     await asyncio.sleep(60)
+#
+#     while True:
+#         try:
+#             task = await tasks.metadata_update(asf_uid="system")
+#             log.info(f"Scheduled remote metadata update with ID {task.id}")
+#         except Exception as e:
+#             log.exception(f"Failed to schedule remote metadata update: 
{e!s}")
+#
+#         # Schedule next update in 24 hours
+#         await asyncio.sleep(86400)
 
-    while True:
-        try:
-            task = await tasks.metadata_update(asf_uid="system")
-            log.info(f"Scheduled remote metadata update with ID {task.id}")
-        except Exception as e:
-            log.exception(f"Failed to schedule remote metadata update: {e!s}")
 
-        # Schedule next update in 24 hours
-        await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+    """Schedule recurring tasks"""
+    # Wait one minute to allow the server to start
+    await asyncio.sleep(30)
+    try:
+        await tasks.clear_scheduled()
+        metadata = await tasks.metadata_update(asf_uid="system", 
schedule_next=True)
+        log.info(f"Scheduled remote metadata update with ID {metadata.id}")
+        workflow = await tasks.workflow_update(asf_uid="system", 
schedule_next=True)
+        log.info(f"Scheduled workflow status update with ID {workflow.id}")
+
+    except Exception as e:
+        log.exception(f"Failed to schedule recurrent tasks: {e!s}")
 
 
 def _register_routes(app: base.QuartApp) -> None:
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 9d89e31..99f2328 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -15,9 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
 from collections.abc import Awaitable, Callable, Coroutine
 from typing import Any, Final
 
+import sqlmodel
+
 import atr.db as db
 import atr.models.results as results
 import atr.models.sql as sql
@@ -57,6 +60,27 @@ async def asc_checks(asf_uid: str, release: sql.Release, 
revision: str, signatur
     return tasks
 
 
+async def clear_scheduled(caller_data: db.Session | None = None):
+    """Clear all future scheduled tasks of the given types."""
+    async with db.ensure_session(caller_data) as data:
+        via = sql.validate_instrumented_attribute
+        now = datetime.datetime.now(datetime.UTC)
+
+        delete_stmt = sqlmodel.delete(sql.Task).where(
+            via(sql.Task.task_type).in_(
+                [
+                    sql.TaskType.METADATA_UPDATE,
+                    sql.TaskType.WORKFLOW_STATUS,
+                ]
+            ),
+            via(sql.Task.status) == sql.TaskStatus.QUEUED,
+            via(sql.Task.added) > now,
+        )
+
+        await data.execute(delete_stmt)
+        await data.commit()
+
+
 async def draft_checks(
     asf_uid: str, project_name: str, release_version: str, revision_number: 
str, caller_data: db.Session | None = None
 ) -> int:
@@ -150,17 +174,27 @@ async def keys_import_file(
         await data.commit()
 
 
-async def metadata_update(asf_uid: str, caller_data: db.Session | None = None) 
-> sql.Task:
+async def metadata_update(
+    asf_uid: str,
+    caller_data: db.Session | None = None,
+    schedule: datetime.datetime | None = None,
+    schedule_next: bool = False,
+) -> sql.Task:
     """Queue a metadata update task."""
+    args = metadata.Update(asf_uid=asf_uid, next_schedule=0)
+    if schedule_next:
+        args.next_schedule = 60 * 24
     async with db.ensure_session(caller_data) as data:
         task = sql.Task(
             status=sql.TaskStatus.QUEUED,
             task_type=sql.TaskType.METADATA_UPDATE,
-            task_args=metadata.Update(asf_uid=asf_uid).model_dump(),
+            task_args=args.model_dump(),
             asf_uid=asf_uid,
             revision_number=None,
             primary_rel_path=None,
         )
+        if schedule:
+            task.added = schedule
         data.add(task)
         await data.commit()
         await data.flush()
@@ -227,6 +261,8 @@ def resolve(task_type: sql.TaskType) -> Callable[..., 
Awaitable[results.Results
             return targz.structure
         case sql.TaskType.VOTE_INITIATE:
             return vote.initiate
+        case sql.TaskType.WORKFLOW_STATUS:
+            return gha.status_check
         case sql.TaskType.ZIPFORMAT_INTEGRITY:
             return zipformat.integrity
         case sql.TaskType.ZIPFORMAT_STRUCTURE:
@@ -259,6 +295,33 @@ async def tar_gz_checks(asf_uid: str, release: 
sql.Release, revision: str, path:
     return tasks
 
 
+async def workflow_update(
+    asf_uid: str,
+    caller_data: db.Session | None = None,
+    schedule: datetime.datetime | None = None,
+    schedule_next: bool = False,
+) -> sql.Task:
+    """Queue a workflow status update task."""
+    args = gha.WorkflowStatusCheck(next_schedule=0, run_id=0)
+    if schedule_next:
+        args.next_schedule = 2
+    async with db.ensure_session(caller_data) as data:
+        task = sql.Task(
+            status=sql.TaskStatus.QUEUED,
+            task_type=sql.TaskType.WORKFLOW_STATUS,
+            task_args=args.model_dump(),
+            asf_uid=asf_uid,
+            revision_number=None,
+            primary_rel_path=None,
+        )
+        if schedule:
+            task.added = schedule
+        data.add(task)
+        await data.commit()
+        await data.flush()
+        return task
+
+
 async def zip_checks(asf_uid: str, release: sql.Release, revision: str, path: 
str) -> list[sql.Task]:
     """Create check tasks for a .zip file."""
     # This release has committee, as guaranteed in draft_checks
diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py
index 5e174c7..9ccaef8 100644
--- a/atr/tasks/gha.py
+++ b/atr/tasks/gha.py
@@ -15,14 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 import asyncio
+import datetime
 import json
 import uuid
 from collections.abc import Callable
 from typing import Any, Final, NoReturn
 
 import aiohttp
+import pydantic
 
 import atr.config as config
+import atr.db as db
 import atr.log as log
 import atr.models.results as results
 import atr.models.schema as schema
@@ -30,7 +33,9 @@ import atr.models.sql as sql
 
 # import atr.shared as shared
 import atr.storage as storage
+import atr.tasks as tasks
 import atr.tasks.checks as checks
+from atr.models.results import DistributionWorkflowStatus
 
 _BASE_URL: Final[str] = "https://api.github.com/repos";
 _IN_PROGRESS_STATUSES: Final[list[str]] = ["in_progress", "queued", 
"requested", "waiting", "pending", "expected"]
@@ -56,6 +61,11 @@ class DistributionWorkflow(schema.Strict):
     name: str = schema.description("Name of the run")
 
 
+class WorkflowStatusCheck(schema.Strict):
+    run_id: int | None = schema.description("Run ID")
+    next_schedule: int = pydantic.Field(default=0, description="The next 
scheduled time (in minutes)")
+
+
 @checks.with_model(DistributionWorkflow)
 async def trigger_workflow(args: DistributionWorkflow, *, task_id: int | None 
= None) -> results.Results | None:
     unique_id = f"atr-dist-{args.name}-{uuid.uuid4()}"
@@ -112,6 +122,78 @@ async def trigger_workflow(args: DistributionWorkflow, *, 
task_id: int | None =
         )
 
 
[email protected]_model(WorkflowStatusCheck)
+async def status_check(args: WorkflowStatusCheck) -> 
DistributionWorkflowStatus:
+    """Check remote workflow statuses."""
+
+    headers = {"Accept": "application/vnd.github+json", "Authorization": 
f"Bearer {config.get().GITHUB_TOKEN}"}
+    log.info("Updating Github workflow statuses from apache/tooling-actions")
+    runs = []
+    try:
+        async with aiohttp.ClientSession() as session:
+            try:
+                async with session.get(
+                    
f"{_BASE_URL}/apache/tooling-actions/actions/runs?event=workflow_dispatch", 
headers=headers
+                ) as response:
+                    response.raise_for_status()
+                    resp_json = await response.json()
+                    runs = resp_json.get("workflow_runs", [])
+            except aiohttp.ClientResponseError as e:
+                _fail(f"Failed to lookup GitHub workflows: {e.message} 
({e.status})")
+
+        updated_count = 0
+
+        if len(runs) > 0:
+            async with db.session() as data:
+                pending_runs = await 
data.workflow_status(status_in=[*_IN_PROGRESS_STATUSES, ""]).all()
+
+                for pending in pending_runs:
+                    # Find matching workflow run from GitHub API
+                    matching_run = next(
+                        (
+                            r
+                            for r in runs
+                            if r.get("id") == pending.run_id and r.get("path", 
"").endswith(f"/{pending.workflow_id}")
+                        ),
+                        None,
+                    )
+
+                    if matching_run:
+                        new_status = matching_run.get("status", "")
+                        new_message = matching_run.get("conclusion")
+                        if new_message == "failure":
+                            new_status = "failed"
+                            new_message = "GitHub workflow failed"
+
+                        # Update status if it has changed
+                        if new_status != pending.status:
+                            pending.status = new_status
+                            if new_message:
+                                pending.message = new_message
+                            updated_count += 1
+                            log.info(
+                                f"Updated workflow {pending.workflow_id} run 
{pending.run_id} to status {new_status}"
+                            )
+                # TODO: If we can't find this run ID in the bulk response, we 
could check it directly by ID in the API
+                await data.commit()
+
+        log.info(
+            f"Workflow status update completed: updated {updated_count} 
workflow(s)",
+        )
+
+        # Schedule next update
+        await _schedule_next(args)
+
+        return results.DistributionWorkflowStatus(
+            kind="distribution_workflow_status",
+        )
+
+    except aiohttp.ClientError as e:
+        _fail(f"Failed to fetch workflow data from GitHub: {e!s}")
+    except Exception as e:
+        _fail(f"Unexpected error during workflow status update: {e!s}")
+
+
 def _fail(message: str) -> NoReturn:
     log.error(message)
     raise RuntimeError(message)
@@ -192,6 +274,15 @@ async def _request_and_retry(
     return None
 
 
+async def _schedule_next(args: WorkflowStatusCheck):
+    if args.next_schedule:
+        next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(minutes=args.next_schedule)
+        await tasks.workflow_update("system", schedule=next_schedule, 
schedule_next=True)
+        log.info(
+            f"Scheduled next workflow status update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
+        )
+
+
 #
 # async def _wait_for_completion(
 #     session: aiohttp.ClientSession,
diff --git a/atr/tasks/metadata.py b/atr/tasks/metadata.py
index 561afeb..ba2b6e5 100644
--- a/atr/tasks/metadata.py
+++ b/atr/tasks/metadata.py
@@ -15,12 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
+
 import aiohttp
+import pydantic
 
 import atr.datasources.apache as apache
 import atr.log as log
 import atr.models.results as results
 import atr.models.schema as schema
+import atr.tasks as tasks
 import atr.tasks.checks as checks
 
 
@@ -28,6 +32,7 @@ class Update(schema.Strict):
     """Arguments for the task to update metadata from remote data sources."""
 
     asf_uid: str = schema.description("The ASF UID of the user triggering the 
update")
+    next_schedule: int = pydantic.Field(default=0, description="The next 
scheduled time (in minutes)")
 
 
 class UpdateError(Exception):
@@ -45,6 +50,15 @@ async def update(args: Update) -> results.Results | None:
         log.info(
             f"Metadata update completed successfully: added {added_count}, 
updated {updated_count}",
         )
+
+        # Schedule next update
+        if args.next_schedule and args.next_schedule > 0:
+            next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(minutes=args.next_schedule)
+            await tasks.metadata_update(args.asf_uid, schedule=next_schedule, 
schedule_next=True)
+            log.info(
+                f"Scheduled next metadata update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
+            )
+
         return results.MetadataUpdate(
             kind="metadata_update",
             added_count=added_count,
diff --git a/atr/worker.py b/atr/worker.py
index 79e2823..db3a9ac 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -114,7 +114,12 @@ async def _task_next_claim() -> tuple[int, str, list[str] 
| dict[str, Any]] | No
             # Get the ID of the oldest queued task
             oldest_queued_task = (
                 sqlmodel.select(sql.Task.id)
-                .where(sql.Task.status == task.QUEUED)
+                .where(
+                    sqlmodel.and_(
+                        sql.Task.status == task.QUEUED,
+                        sql.Task.added <= datetime.datetime.now(datetime.UTC) 
- datetime.timedelta(seconds=2),
+                    )
+                )
                 
.order_by(sql.validate_instrumented_attribute(sql.Task.added).asc())
                 .limit(1)
             )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to