sbp commented on code in PR #529:
URL: 
https://github.com/apache/tooling-trusted-releases/pull/529#discussion_r2694609299


##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
             await data.commit()
 
 
-async def _metadata_update_scheduler() -> None:
-    """Periodically schedule remote metadata updates."""
-    # Wait one minute to allow the server to start
-    await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+#     """Periodically schedule remote metadata updates."""
+#     # Wait one minute to allow the server to start
+#     await asyncio.sleep(60)
+#
+#     while True:
+#         try:
+#             task = await tasks.metadata_update(asf_uid="system")
+#             log.info(f"Scheduled remote metadata update with ID {task.id}")
+#         except Exception as e:
+#             log.exception(f"Failed to schedule remote metadata update: 
{e!s}")
+#
+#         # Schedule next update in 24 hours
+#         await asyncio.sleep(86400)
 
-    while True:
-        try:
-            task = await tasks.metadata_update(asf_uid="system")
-            log.info(f"Scheduled remote metadata update with ID {task.id}")
-        except Exception as e:
-            log.exception(f"Failed to schedule remote metadata update: {e!s}")
 
-        # Schedule next update in 24 hours
-        await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+    """Schedule recurring tasks"""
+    # Wait one minute to allow the server to start
+    await asyncio.sleep(30)

Review Comment:
   Because `_register_recurrent_tasks` is no longer run as a task, the server 
steps in to the `await` here and it blocks the server startup. The fix is to 
run this as a task, as before.



##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
             await data.commit()
 
 
-async def _metadata_update_scheduler() -> None:
-    """Periodically schedule remote metadata updates."""
-    # Wait one minute to allow the server to start
-    await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+#     """Periodically schedule remote metadata updates."""
+#     # Wait one minute to allow the server to start
+#     await asyncio.sleep(60)
+#
+#     while True:
+#         try:
+#             task = await tasks.metadata_update(asf_uid="system")
+#             log.info(f"Scheduled remote metadata update with ID {task.id}")
+#         except Exception as e:
+#             log.exception(f"Failed to schedule remote metadata update: 
{e!s}")
+#
+#         # Schedule next update in 24 hours
+#         await asyncio.sleep(86400)
 
-    while True:
-        try:
-            task = await tasks.metadata_update(asf_uid="system")
-            log.info(f"Scheduled remote metadata update with ID {task.id}")
-        except Exception as e:
-            log.exception(f"Failed to schedule remote metadata update: {e!s}")
 
-        # Schedule next update in 24 hours
-        await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+    """Schedule recurring tasks"""
+    # Wait one minute to allow the server to start

Review Comment:
   This is out of sync with the code that it's commenting, which has changed to 
30s.



##########
atr/tasks/__init__.py:
##########
@@ -57,6 +60,27 @@ async def asc_checks(asf_uid: str, release: sql.Release, 
revision: str, signatur
     return tasks
 
 
+async def clear_scheduled(caller_data: db.Session | None = None):
+    """Clear all future scheduled tasks of the given types."""
+    async with db.ensure_session(caller_data) as data:
+        via = sql.validate_instrumented_attribute
+        now = datetime.datetime.now(datetime.UTC)
+
+        delete_stmt = sqlmodel.delete(sql.Task).where(
+            via(sql.Task.task_type).in_(
+                [
+                    sql.TaskType.METADATA_UPDATE,
+                    sql.TaskType.WORKFLOW_STATUS,
+                ]
+            ),
+            via(sql.Task.status) == sql.TaskStatus.QUEUED,
+            via(sql.Task.added) > now,

Review Comment:
   Perhaps we should remove this and clear all such tasks? There could be a 
little gap where a task is 1s old and hence not yet claimed (because of the 2s 
buffer), and therefore would not be removed either.



##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
             await data.commit()
 
 
-async def _metadata_update_scheduler() -> None:
-    """Periodically schedule remote metadata updates."""
-    # Wait one minute to allow the server to start
-    await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+#     """Periodically schedule remote metadata updates."""
+#     # Wait one minute to allow the server to start
+#     await asyncio.sleep(60)
+#
+#     while True:
+#         try:
+#             task = await tasks.metadata_update(asf_uid="system")
+#             log.info(f"Scheduled remote metadata update with ID {task.id}")
+#         except Exception as e:
+#             log.exception(f"Failed to schedule remote metadata update: 
{e!s}")
+#
+#         # Schedule next update in 24 hours
+#         await asyncio.sleep(86400)
 
-    while True:
-        try:
-            task = await tasks.metadata_update(asf_uid="system")
-            log.info(f"Scheduled remote metadata update with ID {task.id}")
-        except Exception as e:
-            log.exception(f"Failed to schedule remote metadata update: {e!s}")
 
-        # Schedule next update in 24 hours
-        await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+    """Schedule recurring tasks"""
+    # Wait one minute to allow the server to start
+    await asyncio.sleep(30)
+    try:
+        await tasks.clear_scheduled()
+        metadata = await tasks.metadata_update(asf_uid="system", 
schedule_next=True)

Review Comment:
   This is scheduled to run straight away, but for a long time I've been 
wondering whether I should have added a random wait or not, probably between 60 
and 600 seconds or so. Now that we have multiple tasks set to start straight 
away, I think we probably should do this to prevent thundering herd.



##########
atr/tasks/metadata.py:
##########
@@ -45,6 +50,15 @@ async def update(args: Update) -> results.Results | None:
         log.info(
             f"Metadata update completed successfully: added {added_count}, 
updated {updated_count}",
         )
+
+        # Schedule next update
+        if args.next_schedule and args.next_schedule > 0:

Review Comment:
   `(args.next_schedule > 0)`



##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
             await data.commit()
 
 
-async def _metadata_update_scheduler() -> None:
-    """Periodically schedule remote metadata updates."""
-    # Wait one minute to allow the server to start
-    await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+#     """Periodically schedule remote metadata updates."""
+#     # Wait one minute to allow the server to start
+#     await asyncio.sleep(60)
+#
+#     while True:
+#         try:
+#             task = await tasks.metadata_update(asf_uid="system")
+#             log.info(f"Scheduled remote metadata update with ID {task.id}")
+#         except Exception as e:
+#             log.exception(f"Failed to schedule remote metadata update: 
{e!s}")
+#
+#         # Schedule next update in 24 hours
+#         await asyncio.sleep(86400)
 
-    while True:
-        try:
-            task = await tasks.metadata_update(asf_uid="system")
-            log.info(f"Scheduled remote metadata update with ID {task.id}")
-        except Exception as e:
-            log.exception(f"Failed to schedule remote metadata update: {e!s}")
 
-        # Schedule next update in 24 hours
-        await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+    """Schedule recurring tasks"""
+    # Wait one minute to allow the server to start
+    await asyncio.sleep(30)
+    try:
+        await tasks.clear_scheduled()
+        metadata = await tasks.metadata_update(asf_uid="system", 
schedule_next=True)
+        log.info(f"Scheduled remote metadata update with ID {metadata.id}")
+        workflow = await tasks.workflow_update(asf_uid="system", 
schedule_next=True)

Review Comment:
   Must prevent thundering herd here too.



##########
atr/tasks/metadata.py:
##########
@@ -45,6 +50,15 @@ async def update(args: Update) -> results.Results | None:
         log.info(
             f"Metadata update completed successfully: added {added_count}, 
updated {updated_count}",
         )
+
+        # Schedule next update
+        if args.next_schedule and args.next_schedule > 0:

Review Comment:
   Also I'd probably call this `next_schedule_minutes` if it's in minutes, but 
even better would be to use seconds. Everything is measured in seconds these 
days. I mean, these seconds.



##########
atr/worker.py:
##########
@@ -114,7 +114,12 @@ async def _task_next_claim() -> tuple[int, str, list[str] 
| dict[str, Any]] | No
             # Get the ID of the oldest queued task
             oldest_queued_task = (
                 sqlmodel.select(sql.Task.id)
-                .where(sql.Task.status == task.QUEUED)
+                .where(
+                    sqlmodel.and_(
+                        sql.Task.status == task.QUEUED,
+                        sql.Task.added <= datetime.datetime.now(datetime.UTC) 
- datetime.timedelta(seconds=2),

Review Comment:
   Why `datetime.timedelta(seconds=2)`? What does this little buffer period do?



##########
atr/tasks/gha.py:
##########
@@ -192,6 +274,15 @@ async def _request_and_retry(
     return None
 
 
+async def _schedule_next(args: WorkflowStatusCheck):
+    if args.next_schedule:
+        next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(minutes=args.next_schedule)
+        await tasks.workflow_update("system", schedule=next_schedule, 
schedule_next=True)

Review Comment:
   We could change `system` here if the ASF UID were part of the args to the 
task itself, like it is for the metadata update task.



##########
atr/worker.py:
##########
@@ -114,7 +114,12 @@ async def _task_next_claim() -> tuple[int, str, list[str] 
| dict[str, Any]] | No
             # Get the ID of the oldest queued task
             oldest_queued_task = (
                 sqlmodel.select(sql.Task.id)
-                .where(sql.Task.status == task.QUEUED)
+                .where(
+                    sqlmodel.and_(
+                        sql.Task.status == task.QUEUED,
+                        sql.Task.added <= datetime.datetime.now(datetime.UTC) 
- datetime.timedelta(seconds=2),

Review Comment:
   `(datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=2))`



##########
atr/tasks/gha.py:
##########
@@ -56,6 +61,11 @@ class DistributionWorkflow(schema.Strict):
     name: str = schema.description("Name of the run")
 
 
+class WorkflowStatusCheck(schema.Strict):
+    run_id: int | None = schema.description("Run ID")

Review Comment:
   Is `run_id` still used?



##########
atr/tasks/__init__.py:
##########
@@ -259,6 +295,33 @@ async def tar_gz_checks(asf_uid: str, release: 
sql.Release, revision: str, path:
     return tasks
 
 
+async def workflow_update(
+    asf_uid: str,
+    caller_data: db.Session | None = None,
+    schedule: datetime.datetime | None = None,
+    schedule_next: bool = False,
+) -> sql.Task:
+    """Queue a workflow status update task."""
+    args = gha.WorkflowStatusCheck(next_schedule=0, run_id=0)
+    if schedule_next:
+        args.next_schedule = 2
+    async with db.ensure_session(caller_data) as data:
+        task = sql.Task(
+            status=sql.TaskStatus.QUEUED,
+            task_type=sql.TaskType.WORKFLOW_STATUS,
+            task_args=args.model_dump(),
+            asf_uid=asf_uid,

Review Comment:
   Maybe the ASF UID should go in the args to the task itself? The metadata 
update task does it that way, and then when it reschedules itself it still 
knows the ASF UID it was started with. Currently this is always `system`, but 
if we allow it to be stopped and started by an admin that would change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to