This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 2e99ba3410485f23a5d1886f2f39a758c3a6ac15
Author: Alastair McFarlane <[email protected]>
AuthorDate: Fri Jan 16 10:11:50 2026 +0000

    Move asf_uid to args model and tweak logic to clear scheduled tasks
---
 atr/models/sql.py     | 2 +-
 atr/tasks/__init__.py | 5 ++---
 atr/tasks/gha.py      | 9 +++++----
 atr/worker.py         | 5 ++---
 4 files changed, 10 insertions(+), 11 deletions(-)

diff --git a/atr/models/sql.py b/atr/models/sql.py
index 5891a04..444c134 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -358,7 +358,7 @@ class Task(sqlmodel.SQLModel, table=True):
         default_factory=lambda: datetime.datetime.now(datetime.UTC),
         sa_column=sqlalchemy.Column(UTCDateTime, index=True),
     )
-    scheduled: datetime.datetime = sqlmodel.Field(
+    scheduled: datetime.datetime | None = sqlmodel.Field(
         default=None,
         sa_column=sqlalchemy.Column(UTCDateTime, index=True),
     )
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index b575a83..325a7e6 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -64,7 +64,6 @@ async def clear_scheduled(caller_data: db.Session | None = 
None):
     """Clear all future scheduled tasks of the given types."""
     async with db.ensure_session(caller_data) as data:
         via = sql.validate_instrumented_attribute
-        now = datetime.datetime.now(datetime.UTC)
 
         delete_stmt = sqlmodel.delete(sql.Task).where(
             via(sql.Task.task_type).in_(
@@ -74,7 +73,7 @@ async def clear_scheduled(caller_data: db.Session | None = 
None):
                 ]
             ),
             via(sql.Task.status) == sql.TaskStatus.QUEUED,
-            sqlmodel.or_(via(sql.Task.scheduled).is_(None), 
via(sql.Task.scheduled) > now),
+            via(sql.Task.scheduled).is_not(None),
         )
 
         await data.execute(delete_stmt)
@@ -302,7 +301,7 @@ async def workflow_update(
     schedule_next: bool = False,
 ) -> sql.Task:
     """Queue a workflow status update task."""
-    args = gha.WorkflowStatusCheck(next_schedule_seconds=0, run_id=0)
+    args = gha.WorkflowStatusCheck(next_schedule_seconds=0, run_id=0, 
asf_uid=asf_uid)
     if schedule_next:
         args.next_schedule_seconds = 2 * 60
     async with db.ensure_session(caller_data) as data:
diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py
index 1e8eda1..0baf302 100644
--- a/atr/tasks/gha.py
+++ b/atr/tasks/gha.py
@@ -64,6 +64,7 @@ class DistributionWorkflow(schema.Strict):
 class WorkflowStatusCheck(schema.Strict):
     run_id: int | None = schema.description("Run ID")
     next_schedule_seconds: int = pydantic.Field(default=0, description="The 
next scheduled time")
+    asf_uid: str = schema.description("ASF UID of the user triggering the 
workflow")
 
 
 @checks.with_model(DistributionWorkflow)
@@ -123,7 +124,7 @@ async def trigger_workflow(args: DistributionWorkflow, *, 
task_id: int | None =
 
 
 @checks.with_model(WorkflowStatusCheck)
-async def status_check(args: WorkflowStatusCheck, asf_uid: str) -> 
DistributionWorkflowStatus:
+async def status_check(args: WorkflowStatusCheck) -> 
DistributionWorkflowStatus:
     """Check remote workflow statuses."""
 
     headers = {"Accept": "application/vnd.github+json", "Authorization": 
f"Bearer {config.get().GITHUB_TOKEN}"}
@@ -182,7 +183,7 @@ async def status_check(args: WorkflowStatusCheck, asf_uid: 
str) -> DistributionW
         )
 
         # Schedule next update
-        await _schedule_next(args, asf_uid)
+        await _schedule_next(args)
 
         return results.DistributionWorkflowStatus(
             kind="distribution_workflow_status",
@@ -274,10 +275,10 @@ async def _request_and_retry(
     return None
 
 
-async def _schedule_next(args: WorkflowStatusCheck, asf_uid: str) -> None:
+async def _schedule_next(args: WorkflowStatusCheck) -> None:
     if args.next_schedule_seconds:
         next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(seconds=args.next_schedule_seconds)
-        await tasks.workflow_update(asf_uid, schedule=next_schedule, 
schedule_next=True)
+        await tasks.workflow_update(args.asf_uid, schedule=next_schedule, 
schedule_next=True)
         log.info(
             f"Scheduled next workflow status update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
         )
diff --git a/atr/worker.py b/atr/worker.py
index aa19e28..b1b8f9c 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -120,7 +120,8 @@ async def _task_next_claim() -> tuple[int, str, list[str] | 
dict[str, Any], str]
                     sqlmodel.and_(
                         sql.Task.status == task.QUEUED,
                         sqlmodel.or_(
-                            via(sql.Task.scheduled).is_(None), 
sql.Task.scheduled <= datetime.datetime.now(datetime.UTC)
+                            via(sql.Task.scheduled).is_(None),
+                            via(sql.Task.scheduled) <= 
datetime.datetime.now(datetime.UTC),
                         ),
                     )
                 )
@@ -178,8 +179,6 @@ async def _task_process(task_id: int, task_type: str, 
task_args: list[str] | dic
             additional_kwargs = {}
             if sig.parameters.get("task_id") is not None:
                 additional_kwargs["task_id"] = task_id
-            if sig.parameters.get("asf_uid") is not None:
-                additional_kwargs["asf_uid"] = asf_uid
             handler_result = await handler(task_args, **additional_kwargs)
 
         task_results = handler_result


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to