This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-atr-experiments.git
The following commit(s) were added to refs/heads/main by this push: new ca145ab Add an experimental worker module ca145ab is described below commit ca145abbb80c069b394ea86316c8cf46e28bee69 Author: Sean B. Palmer <s...@miscoranda.com> AuthorDate: Wed Feb 19 21:10:21 2025 +0200 Add an experimental worker module --- atr/blueprints/secret/secret.py | 33 ++++ atr/db/models.py | 49 +++++- atr/templates/secret/tasks-add-random.html | 48 ++++++ atr/worker.py | 180 +++++++++++++++++++++ ...al_schema.py => 5d63e3dd5e93_initial_schema.py} | 6 +- 5 files changed, 312 insertions(+), 4 deletions(-) diff --git a/atr/blueprints/secret/secret.py b/atr/blueprints/secret/secret.py index 6dae033..5cb0e2c 100644 --- a/atr/blueprints/secret/secret.py +++ b/atr/blueprints/secret/secret.py @@ -15,7 +15,9 @@ # specific language governing permissions and limitations # under the License. +import datetime import json +import secrets import httpx from quart import current_app, flash, redirect, render_template, request, url_for @@ -33,6 +35,8 @@ from atr.db.models import ( ProductLine, PublicSigningKey, Release, + Task, + TaskStatus, VotePolicy, ) from atr.db.service import get_pmcs @@ -57,6 +61,7 @@ async def secret_data(model: str = "PMC") -> str: "DistributionChannel": DistributionChannel, "PublicSigningKey": PublicSigningKey, "PMCKeyLink": PMCKeyLink, + "Task": Task, } if model not in models: @@ -187,3 +192,31 @@ async def secret_keys_delete_all() -> str: await db_session.delete(key) return f"Deleted {count} keys" + + +@blueprint.route("/tasks/add-random", methods=["GET", "POST"]) +async def secret_tasks_add_random() -> str | Response: + """Add a random task to the queue for testing.""" + if request.method == "POST": + async with get_session() as db_session: + async with db_session.begin(): + # Create a random task + task = Task( + id=None, + task_type="example", + task_args=json.dumps( + { + "random_number": secrets.randbelow(100), + "timestamp": datetime.datetime.now(datetime.UTC).isoformat(), + } + ), + status=TaskStatus.QUEUED, + ) + db_session.add(task) + # Flush to get the task ID + await db_session.flush() + await flash(f"Added random task (ID: {task.id})", "success") + + return redirect(url_for("secret_blueprint.secret_tasks_add_random")) + + return await render_template("secret/tasks-add-random.html") diff --git a/atr/db/models.py b/atr/db/models.py index aa0e58b..f7a37d8 100644 --- a/atr/db/models.py +++ b/atr/db/models.py @@ -22,7 +22,7 @@ from enum import Enum from typing import Optional from pydantic import BaseModel -from sqlalchemy import JSON, Column +from sqlalchemy import JSON, CheckConstraint, Column, Index from sqlmodel import Field, Relationship, SQLModel @@ -184,6 +184,53 @@ class ReleasePhase(str, Enum): ARCHIVED = "archived" +class TaskStatus(str, Enum): + """Status of a task in the task queue.""" + + QUEUED = "queued" + ACTIVE = "active" + COMPLETED = "completed" + FAILED = "failed" + + +class Task(SQLModel, table=True): + """A task in the task queue.""" + + id: int | None = Field(default=None, primary_key=True) + task_type: str + task_args: str = Field(sa_column=Column(JSON)) + status: TaskStatus = Field(default=TaskStatus.QUEUED, index=True) + added: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.UTC), index=True) + started: datetime.datetime | None = None + completed: datetime.datetime | None = None + pid: int | None = None + error: str | None = None + + # Create an index on status and added for efficient task claiming + __table_args__ = ( + Index("ix_task_status_added", "status", "added"), + # Ensure valid status transitions: + # - QUEUED can transition to ACTIVE + # - ACTIVE can transition to COMPLETED or FAILED + # - COMPLETED and FAILED are terminal states + CheckConstraint( + """ + ( + -- Initial state is always valid + status = 'QUEUED' + -- QUEUED -> ACTIVE requires setting started time and pid + OR (status = 'ACTIVE' AND started IS NOT NULL AND pid IS NOT NULL) + -- ACTIVE -> COMPLETED requires setting completed time + OR (status = 'COMPLETED' AND completed IS NOT NULL) + -- ACTIVE -> FAILED requires setting completed time and error + OR (status = 'FAILED' AND completed IS NOT NULL AND error IS NOT NULL) + ) + """, + name="valid_task_status_transitions", + ), + ) + + class Release(SQLModel, table=True): storage_key: str = Field(primary_key=True) stage: ReleaseStage diff --git a/atr/templates/secret/tasks-add-random.html b/atr/templates/secret/tasks-add-random.html new file mode 100644 index 0000000..f140bb7 --- /dev/null +++ b/atr/templates/secret/tasks-add-random.html @@ -0,0 +1,48 @@ +{% extends "layouts/base.html" %} + +{% block title %} + Add random task ~ ATR +{% endblock title %} + +{% block description %} + Add a random task to the queue for testing. +{% endblock description %} + +{% block stylesheets %} + {{ super() }} + <style> + .status-message { + margin: 1.5rem 0; + padding: 1rem; + border-radius: 4px; + } + + .status-message.success { + background: #d4edda; + border: 1px solid #c3e6cb; + color: #155724; + } + + .status-message.error { + background: #f8d7da; + border: 1px solid #f5c6cb; + color: #721c24; + } + </style> +{% endblock stylesheets %} + +{% block content %} + <div class="container mt-4"> + <h1>Add random task</h1> + + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %}<div class="status-message {{ category }}">{{ message }}</div>{% endfor %} + {% endif %} + {% endwith %} + + <form method="post" class="mt-4"> + <button type="submit" class="btn btn-primary">Add random task</button> + </form> + </div> +{% endblock content %} diff --git a/atr/worker.py b/atr/worker.py new file mode 100644 index 0000000..7c4d3e2 --- /dev/null +++ b/atr/worker.py @@ -0,0 +1,180 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""worker.py - Task worker process for ATR""" + +# TODO: If started is older than some threshold and status +# is active but the pid is no longer running, we can revert +# the task to status='QUEUED'. For this to work, ideally we +# need to check wall clock time as well as CPU time. + +import datetime +import logging +import os +import resource +import signal +import sys +import time +from datetime import UTC +from typing import NoReturn + +from sqlalchemy import create_engine, text +from sqlalchemy.orm import Session + +# Configure logging +logging.basicConfig( + format="[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)s] %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + +# Resource limits, 5 minutes and 1GB +CPU_LIMIT_SECONDS = 300 +MEMORY_LIMIT_BYTES = 1024 * 1024 * 1024 + +# Create database engine +engine = create_engine("sqlite:///atr.db", echo=False) + +# # Create tables if they don't exist +# SQLModel.metadata.create_all(engine) + + +def get_db_session() -> Session: + """Get a new database session.""" + return Session(engine) + + +def claim_next_task() -> tuple[int, str, str] | None: + """ + Attempt to claim the oldest unclaimed task. + Returns (task_id, task_type, task_args) if successful. + Returns None if no tasks are available. + """ + with get_db_session() as session: + with session.begin(): + # Find and claim the oldest unclaimed task + # We have an index on (status, added) + result = session.execute( + text(""" + UPDATE task + SET started = :now, pid = :pid, status = 'ACTIVE' + WHERE id = ( + SELECT id FROM task + WHERE status = 'QUEUED' + ORDER BY added ASC LIMIT 1 + ) + AND status = 'QUEUED' + RETURNING id, task_type, task_args + """), + {"now": datetime.datetime.now(UTC), "pid": os.getpid()}, + ) + task = result.first() + if task: + task_id, task_type, task_args = task + logger.info(f"Claimed task {task_id} ({task_type}) with args {task_args}") + return task_id, task_type, task_args + + return None + + +def process_task(task_id: int, task_type: str, task_args: str) -> None: + """Process a claimed task.""" + logger.info(f"Processing task {task_id} ({task_type}) with args {task_args}") + try: + # TODO: Implement actual task processing + time.sleep(1) + + with get_db_session() as session: + with session.begin(): + session.execute( + text(""" + UPDATE task + SET completed = :now, status = 'COMPLETED' + WHERE id = :task_id + """), + {"now": datetime.datetime.now(UTC), "task_id": task_id}, + ) + except Exception as e: + logger.error(f"Task {task_id} failed: {e}") + with get_db_session() as session: + with session.begin(): + session.execute( + text(""" + UPDATE task + SET completed = :now, status = 'FAILED', error = :error + WHERE id = :task_id + """), + {"now": datetime.datetime.now(UTC), "task_id": task_id, "error": str(e)}, + ) + + +def set_resource_limits() -> None: + """Set CPU and memory limits for this process.""" + # Set CPU time limit + try: + resource.setrlimit(resource.RLIMIT_CPU, (CPU_LIMIT_SECONDS, CPU_LIMIT_SECONDS)) + logger.info(f"Set CPU time limit to {CPU_LIMIT_SECONDS} seconds") + except ValueError as e: + logger.warning(f"Could not set CPU time limit: {e}") + + # Set memory limit + try: + resource.setrlimit(resource.RLIMIT_AS, (MEMORY_LIMIT_BYTES, MEMORY_LIMIT_BYTES)) + logger.info(f"Set memory limit to {MEMORY_LIMIT_BYTES} bytes") + except ValueError as e: + logger.warning(f"Could not set memory limit: {e}") + + +def worker_loop() -> NoReturn: + """Main worker loop.""" + logger.info(f"Worker starting (PID: {os.getpid()})") + + while True: + try: + task = claim_next_task() + if task: + task_id, task_type, task_args = task + process_task(task_id, task_type, task_args) + else: + # No tasks available, wait 20ms before checking again + time.sleep(0.02) + except Exception as e: + # TODO: Should probably be more robust about this + logger.error(f"Worker loop error: {e}") + time.sleep(1) + + +def signal_handler(signum: int, frame: object) -> None: + """Handle termination signals gracefully.""" + # For RLIMIT_AS we'll generally get a SIGKILL + # For RLIMIT_CPU we'll get a SIGXCPU, which we can catch + logger.info(f"Received signal {signum}, shutting down...") + sys.exit(0) + + +def main() -> None: + """Main entry point.""" + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + set_resource_limits() + worker_loop() + + +if __name__ == "__main__": + main() diff --git a/migrations/versions/b8dd95b83501_initial_schema.py b/migrations/versions/5d63e3dd5e93_initial_schema.py similarity index 84% rename from migrations/versions/b8dd95b83501_initial_schema.py rename to migrations/versions/5d63e3dd5e93_initial_schema.py index bc0deed..104e000 100644 --- a/migrations/versions/b8dd95b83501_initial_schema.py +++ b/migrations/versions/5d63e3dd5e93_initial_schema.py @@ -1,15 +1,15 @@ """initial_schema -Revision ID: b8dd95b83501 +Revision ID: 5d63e3dd5e93 Revises: -Create Date: 2025-02-19 20:20:11.349128 +Create Date: 2025-02-19 20:56:26.175412 """ from collections.abc import Sequence # revision identifiers, used by Alembic. -revision: str = "b8dd95b83501" +revision: str = "5d63e3dd5e93" down_revision: str | None = None branch_labels: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tooling.apache.org For additional commands, e-mail: dev-h...@tooling.apache.org