sbp commented on code in PR #529:
URL:
https://github.com/apache/tooling-trusted-releases/pull/529#discussion_r2694609299
##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
await data.commit()
-async def _metadata_update_scheduler() -> None:
- """Periodically schedule remote metadata updates."""
- # Wait one minute to allow the server to start
- await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+# """Periodically schedule remote metadata updates."""
+# # Wait one minute to allow the server to start
+# await asyncio.sleep(60)
+#
+# while True:
+# try:
+# task = await tasks.metadata_update(asf_uid="system")
+# log.info(f"Scheduled remote metadata update with ID {task.id}")
+# except Exception as e:
+# log.exception(f"Failed to schedule remote metadata update:
{e!s}")
+#
+# # Schedule next update in 24 hours
+# await asyncio.sleep(86400)
- while True:
- try:
- task = await tasks.metadata_update(asf_uid="system")
- log.info(f"Scheduled remote metadata update with ID {task.id}")
- except Exception as e:
- log.exception(f"Failed to schedule remote metadata update: {e!s}")
- # Schedule next update in 24 hours
- await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+ """Schedule recurring tasks"""
+ # Wait one minute to allow the server to start
+ await asyncio.sleep(30)
Review Comment:
Because `_register_recurrent_tasks` is no longer run as a task, the server
steps in to the `await` here and it blocks the server startup. The fix is to
run this as a task, as before.
##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
await data.commit()
-async def _metadata_update_scheduler() -> None:
- """Periodically schedule remote metadata updates."""
- # Wait one minute to allow the server to start
- await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+# """Periodically schedule remote metadata updates."""
+# # Wait one minute to allow the server to start
+# await asyncio.sleep(60)
+#
+# while True:
+# try:
+# task = await tasks.metadata_update(asf_uid="system")
+# log.info(f"Scheduled remote metadata update with ID {task.id}")
+# except Exception as e:
+# log.exception(f"Failed to schedule remote metadata update:
{e!s}")
+#
+# # Schedule next update in 24 hours
+# await asyncio.sleep(86400)
- while True:
- try:
- task = await tasks.metadata_update(asf_uid="system")
- log.info(f"Scheduled remote metadata update with ID {task.id}")
- except Exception as e:
- log.exception(f"Failed to schedule remote metadata update: {e!s}")
- # Schedule next update in 24 hours
- await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+ """Schedule recurring tasks"""
+ # Wait one minute to allow the server to start
Review Comment:
This is out of sync with the code that it's commenting, which has changed to
30s.
##########
atr/tasks/__init__.py:
##########
@@ -57,6 +60,27 @@ async def asc_checks(asf_uid: str, release: sql.Release,
revision: str, signatur
return tasks
+async def clear_scheduled(caller_data: db.Session | None = None):
+ """Clear all future scheduled tasks of the given types."""
+ async with db.ensure_session(caller_data) as data:
+ via = sql.validate_instrumented_attribute
+ now = datetime.datetime.now(datetime.UTC)
+
+ delete_stmt = sqlmodel.delete(sql.Task).where(
+ via(sql.Task.task_type).in_(
+ [
+ sql.TaskType.METADATA_UPDATE,
+ sql.TaskType.WORKFLOW_STATUS,
+ ]
+ ),
+ via(sql.Task.status) == sql.TaskStatus.QUEUED,
+ via(sql.Task.added) > now,
Review Comment:
Perhaps we should remove this and clear all such tasks? There could be a
little gap where a task is 1s old and hence not yet claimed (because of the 2s
buffer), and therefore would not be removed either.
##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
await data.commit()
-async def _metadata_update_scheduler() -> None:
- """Periodically schedule remote metadata updates."""
- # Wait one minute to allow the server to start
- await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+# """Periodically schedule remote metadata updates."""
+# # Wait one minute to allow the server to start
+# await asyncio.sleep(60)
+#
+# while True:
+# try:
+# task = await tasks.metadata_update(asf_uid="system")
+# log.info(f"Scheduled remote metadata update with ID {task.id}")
+# except Exception as e:
+# log.exception(f"Failed to schedule remote metadata update:
{e!s}")
+#
+# # Schedule next update in 24 hours
+# await asyncio.sleep(86400)
- while True:
- try:
- task = await tasks.metadata_update(asf_uid="system")
- log.info(f"Scheduled remote metadata update with ID {task.id}")
- except Exception as e:
- log.exception(f"Failed to schedule remote metadata update: {e!s}")
- # Schedule next update in 24 hours
- await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+ """Schedule recurring tasks"""
+ # Wait one minute to allow the server to start
+ await asyncio.sleep(30)
+ try:
+ await tasks.clear_scheduled()
+ metadata = await tasks.metadata_update(asf_uid="system",
schedule_next=True)
Review Comment:
This is scheduled to run straight away, but for a long time I've been
wondering whether I should have added a random wait or not, probably between 60
and 600 seconds or so. Now that we have multiple tasks set to start straight
away, I think we probably should do this to prevent thundering herd.
##########
atr/tasks/metadata.py:
##########
@@ -45,6 +50,15 @@ async def update(args: Update) -> results.Results | None:
log.info(
f"Metadata update completed successfully: added {added_count},
updated {updated_count}",
)
+
+ # Schedule next update
+ if args.next_schedule and args.next_schedule > 0:
Review Comment:
`(args.next_schedule > 0)`
##########
atr/server.py:
##########
@@ -463,20 +464,36 @@ async def _initialise_test_environment() -> None:
await data.commit()
-async def _metadata_update_scheduler() -> None:
- """Periodically schedule remote metadata updates."""
- # Wait one minute to allow the server to start
- await asyncio.sleep(60)
+#
+# async def _metadata_update_scheduler() -> None:
+# """Periodically schedule remote metadata updates."""
+# # Wait one minute to allow the server to start
+# await asyncio.sleep(60)
+#
+# while True:
+# try:
+# task = await tasks.metadata_update(asf_uid="system")
+# log.info(f"Scheduled remote metadata update with ID {task.id}")
+# except Exception as e:
+# log.exception(f"Failed to schedule remote metadata update:
{e!s}")
+#
+# # Schedule next update in 24 hours
+# await asyncio.sleep(86400)
- while True:
- try:
- task = await tasks.metadata_update(asf_uid="system")
- log.info(f"Scheduled remote metadata update with ID {task.id}")
- except Exception as e:
- log.exception(f"Failed to schedule remote metadata update: {e!s}")
- # Schedule next update in 24 hours
- await asyncio.sleep(86400)
+async def _register_recurrent_tasks() -> None:
+ """Schedule recurring tasks"""
+ # Wait one minute to allow the server to start
+ await asyncio.sleep(30)
+ try:
+ await tasks.clear_scheduled()
+ metadata = await tasks.metadata_update(asf_uid="system",
schedule_next=True)
+ log.info(f"Scheduled remote metadata update with ID {metadata.id}")
+ workflow = await tasks.workflow_update(asf_uid="system",
schedule_next=True)
Review Comment:
Must prevent thundering herd here too.
##########
atr/tasks/metadata.py:
##########
@@ -45,6 +50,15 @@ async def update(args: Update) -> results.Results | None:
log.info(
f"Metadata update completed successfully: added {added_count},
updated {updated_count}",
)
+
+ # Schedule next update
+ if args.next_schedule and args.next_schedule > 0:
Review Comment:
Also I'd probably call this `next_schedule_minutes` if it's in minutes, but
even better would be to use seconds. Everything is measured in seconds these
days. I mean, these seconds.
##########
atr/worker.py:
##########
@@ -114,7 +114,12 @@ async def _task_next_claim() -> tuple[int, str, list[str]
| dict[str, Any]] | No
# Get the ID of the oldest queued task
oldest_queued_task = (
sqlmodel.select(sql.Task.id)
- .where(sql.Task.status == task.QUEUED)
+ .where(
+ sqlmodel.and_(
+ sql.Task.status == task.QUEUED,
+ sql.Task.added <= datetime.datetime.now(datetime.UTC)
- datetime.timedelta(seconds=2),
Review Comment:
Why `datetime.timedelta(seconds=2)`? What does this little buffer period do?
##########
atr/tasks/gha.py:
##########
@@ -192,6 +274,15 @@ async def _request_and_retry(
return None
+async def _schedule_next(args: WorkflowStatusCheck):
+ if args.next_schedule:
+ next_schedule = datetime.datetime.now(datetime.UTC) +
datetime.timedelta(minutes=args.next_schedule)
+ await tasks.workflow_update("system", schedule=next_schedule,
schedule_next=True)
Review Comment:
We could change `system` here if the ASF UID were part of the args to the
task itself, like it is for the metadata update task.
##########
atr/worker.py:
##########
@@ -114,7 +114,12 @@ async def _task_next_claim() -> tuple[int, str, list[str]
| dict[str, Any]] | No
# Get the ID of the oldest queued task
oldest_queued_task = (
sqlmodel.select(sql.Task.id)
- .where(sql.Task.status == task.QUEUED)
+ .where(
+ sqlmodel.and_(
+ sql.Task.status == task.QUEUED,
+ sql.Task.added <= datetime.datetime.now(datetime.UTC)
- datetime.timedelta(seconds=2),
Review Comment:
`(datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=2))`
##########
atr/tasks/gha.py:
##########
@@ -56,6 +61,11 @@ class DistributionWorkflow(schema.Strict):
name: str = schema.description("Name of the run")
+class WorkflowStatusCheck(schema.Strict):
+ run_id: int | None = schema.description("Run ID")
Review Comment:
Is `run_id` still used?
##########
atr/tasks/__init__.py:
##########
@@ -259,6 +295,33 @@ async def tar_gz_checks(asf_uid: str, release:
sql.Release, revision: str, path:
return tasks
+async def workflow_update(
+ asf_uid: str,
+ caller_data: db.Session | None = None,
+ schedule: datetime.datetime | None = None,
+ schedule_next: bool = False,
+) -> sql.Task:
+ """Queue a workflow status update task."""
+ args = gha.WorkflowStatusCheck(next_schedule=0, run_id=0)
+ if schedule_next:
+ args.next_schedule = 2
+ async with db.ensure_session(caller_data) as data:
+ task = sql.Task(
+ status=sql.TaskStatus.QUEUED,
+ task_type=sql.TaskType.WORKFLOW_STATUS,
+ task_args=args.model_dump(),
+ asf_uid=asf_uid,
Review Comment:
Maybe the ASF UID should go in the args to the task itself? The metadata
update task does it that way, and then when it reschedules itself it still
knows the ASF UID it was started with. Currently this is always `system`, but
if we allow it to be stopped and started by an admin that would change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]