This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-atr-experiments.git


The following commit(s) were added to refs/heads/main by this push:
     new ca145ab  Add an experimental worker module
ca145ab is described below

commit ca145abbb80c069b394ea86316c8cf46e28bee69
Author: Sean B. Palmer <s...@miscoranda.com>
AuthorDate: Wed Feb 19 21:10:21 2025 +0200

    Add an experimental worker module
---
 atr/blueprints/secret/secret.py                    |  33 ++++
 atr/db/models.py                                   |  49 +++++-
 atr/templates/secret/tasks-add-random.html         |  48 ++++++
 atr/worker.py                                      | 180 +++++++++++++++++++++
 ...al_schema.py => 5d63e3dd5e93_initial_schema.py} |   6 +-
 5 files changed, 312 insertions(+), 4 deletions(-)

diff --git a/atr/blueprints/secret/secret.py b/atr/blueprints/secret/secret.py
index 6dae033..5cb0e2c 100644
--- a/atr/blueprints/secret/secret.py
+++ b/atr/blueprints/secret/secret.py
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
 import json
+import secrets
 
 import httpx
 from quart import current_app, flash, redirect, render_template, request, 
url_for
@@ -33,6 +35,8 @@ from atr.db.models import (
     ProductLine,
     PublicSigningKey,
     Release,
+    Task,
+    TaskStatus,
     VotePolicy,
 )
 from atr.db.service import get_pmcs
@@ -57,6 +61,7 @@ async def secret_data(model: str = "PMC") -> str:
         "DistributionChannel": DistributionChannel,
         "PublicSigningKey": PublicSigningKey,
         "PMCKeyLink": PMCKeyLink,
+        "Task": Task,
     }
 
     if model not in models:
@@ -187,3 +192,31 @@ async def secret_keys_delete_all() -> str:
                 await db_session.delete(key)
 
         return f"Deleted {count} keys"
+
+
+@blueprint.route("/tasks/add-random", methods=["GET", "POST"])
+async def secret_tasks_add_random() -> str | Response:
+    """Add a random task to the queue for testing."""
+    if request.method == "POST":
+        async with get_session() as db_session:
+            async with db_session.begin():
+                # Create a random task
+                task = Task(
+                    id=None,
+                    task_type="example",
+                    task_args=json.dumps(
+                        {
+                            "random_number": secrets.randbelow(100),
+                            "timestamp": 
datetime.datetime.now(datetime.UTC).isoformat(),
+                        }
+                    ),
+                    status=TaskStatus.QUEUED,
+                )
+                db_session.add(task)
+                # Flush to get the task ID
+                await db_session.flush()
+                await flash(f"Added random task (ID: {task.id})", "success")
+
+        return redirect(url_for("secret_blueprint.secret_tasks_add_random"))
+
+    return await render_template("secret/tasks-add-random.html")
diff --git a/atr/db/models.py b/atr/db/models.py
index aa0e58b..f7a37d8 100644
--- a/atr/db/models.py
+++ b/atr/db/models.py
@@ -22,7 +22,7 @@ from enum import Enum
 from typing import Optional
 
 from pydantic import BaseModel
-from sqlalchemy import JSON, Column
+from sqlalchemy import JSON, CheckConstraint, Column, Index
 from sqlmodel import Field, Relationship, SQLModel
 
 
@@ -184,6 +184,53 @@ class ReleasePhase(str, Enum):
     ARCHIVED = "archived"
 
 
+class TaskStatus(str, Enum):
+    """Status of a task in the task queue."""
+
+    QUEUED = "queued"
+    ACTIVE = "active"
+    COMPLETED = "completed"
+    FAILED = "failed"
+
+
+class Task(SQLModel, table=True):
+    """A task in the task queue."""
+
+    id: int | None = Field(default=None, primary_key=True)
+    task_type: str
+    task_args: str = Field(sa_column=Column(JSON))
+    status: TaskStatus = Field(default=TaskStatus.QUEUED, index=True)
+    added: datetime.datetime = Field(default_factory=lambda: 
datetime.datetime.now(datetime.UTC), index=True)
+    started: datetime.datetime | None = None
+    completed: datetime.datetime | None = None
+    pid: int | None = None
+    error: str | None = None
+
+    # Create an index on status and added for efficient task claiming
+    __table_args__ = (
+        Index("ix_task_status_added", "status", "added"),
+        # Ensure valid status transitions:
+        # - QUEUED can transition to ACTIVE
+        # - ACTIVE can transition to COMPLETED or FAILED
+        # - COMPLETED and FAILED are terminal states
+        CheckConstraint(
+            """
+            (
+                -- Initial state is always valid
+                status = 'QUEUED'
+                -- QUEUED -> ACTIVE requires setting started time and pid
+                OR (status = 'ACTIVE' AND started IS NOT NULL AND pid IS NOT 
NULL)
+                -- ACTIVE -> COMPLETED requires setting completed time
+                OR (status = 'COMPLETED' AND completed IS NOT NULL)
+                -- ACTIVE -> FAILED requires setting completed time and error
+                OR (status = 'FAILED' AND completed IS NOT NULL AND error IS 
NOT NULL)
+            )
+            """,
+            name="valid_task_status_transitions",
+        ),
+    )
+
+
 class Release(SQLModel, table=True):
     storage_key: str = Field(primary_key=True)
     stage: ReleaseStage
diff --git a/atr/templates/secret/tasks-add-random.html 
b/atr/templates/secret/tasks-add-random.html
new file mode 100644
index 0000000..f140bb7
--- /dev/null
+++ b/atr/templates/secret/tasks-add-random.html
@@ -0,0 +1,48 @@
+{% extends "layouts/base.html" %}
+
+{% block title %}
+  Add random task ~ ATR
+{% endblock title %}
+
+{% block description %}
+  Add a random task to the queue for testing.
+{% endblock description %}
+
+{% block stylesheets %}
+  {{ super() }}
+  <style>
+      .status-message {
+          margin: 1.5rem 0;
+          padding: 1rem;
+          border-radius: 4px;
+      }
+
+      .status-message.success {
+          background: #d4edda;
+          border: 1px solid #c3e6cb;
+          color: #155724;
+      }
+
+      .status-message.error {
+          background: #f8d7da;
+          border: 1px solid #f5c6cb;
+          color: #721c24;
+      }
+  </style>
+{% endblock stylesheets %}
+
+{% block content %}
+  <div class="container mt-4">
+    <h1>Add random task</h1>
+
+    {% with messages = get_flashed_messages(with_categories=true) %}
+      {% if messages %}
+        {% for category, message in messages %}<div class="status-message {{ 
category }}">{{ message }}</div>{% endfor %}
+      {% endif %}
+    {% endwith %}
+
+    <form method="post" class="mt-4">
+      <button type="submit" class="btn btn-primary">Add random task</button>
+    </form>
+  </div>
+{% endblock content %}
diff --git a/atr/worker.py b/atr/worker.py
new file mode 100644
index 0000000..7c4d3e2
--- /dev/null
+++ b/atr/worker.py
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""worker.py - Task worker process for ATR"""
+
+# TODO: If started is older than some threshold and status
+# is active but the pid is no longer running, we can revert
+# the task to status='QUEUED'. For this to work, ideally we
+# need to check wall clock time as well as CPU time.
+
+import datetime
+import logging
+import os
+import resource
+import signal
+import sys
+import time
+from datetime import UTC
+from typing import NoReturn
+
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import Session
+
+# Configure logging
+logging.basicConfig(
+    format="[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)s] 
%(message)s",
+    level=logging.INFO,
+    datefmt="%Y-%m-%d %H:%M:%S",
+)
+logger = logging.getLogger(__name__)
+
+# Resource limits, 5 minutes and 1GB
+CPU_LIMIT_SECONDS = 300
+MEMORY_LIMIT_BYTES = 1024 * 1024 * 1024
+
+# Create database engine
+engine = create_engine("sqlite:///atr.db", echo=False)
+
+# # Create tables if they don't exist
+# SQLModel.metadata.create_all(engine)
+
+
+def get_db_session() -> Session:
+    """Get a new database session."""
+    return Session(engine)
+
+
+def claim_next_task() -> tuple[int, str, str] | None:
+    """
+    Attempt to claim the oldest unclaimed task.
+    Returns (task_id, task_type, task_args) if successful.
+    Returns None if no tasks are available.
+    """
+    with get_db_session() as session:
+        with session.begin():
+            # Find and claim the oldest unclaimed task
+            # We have an index on (status, added)
+            result = session.execute(
+                text("""
+                    UPDATE task
+                    SET started = :now, pid = :pid, status = 'ACTIVE'
+                    WHERE id = (
+                        SELECT id FROM task
+                        WHERE status = 'QUEUED'
+                        ORDER BY added ASC LIMIT 1
+                    )
+                    AND status = 'QUEUED'
+                    RETURNING id, task_type, task_args
+                    """),
+                {"now": datetime.datetime.now(UTC), "pid": os.getpid()},
+            )
+            task = result.first()
+            if task:
+                task_id, task_type, task_args = task
+                logger.info(f"Claimed task {task_id} ({task_type}) with args 
{task_args}")
+                return task_id, task_type, task_args
+
+            return None
+
+
+def process_task(task_id: int, task_type: str, task_args: str) -> None:
+    """Process a claimed task."""
+    logger.info(f"Processing task {task_id} ({task_type}) with args 
{task_args}")
+    try:
+        # TODO: Implement actual task processing
+        time.sleep(1)
+
+        with get_db_session() as session:
+            with session.begin():
+                session.execute(
+                    text("""
+                        UPDATE task
+                        SET completed = :now, status = 'COMPLETED'
+                        WHERE id = :task_id
+                        """),
+                    {"now": datetime.datetime.now(UTC), "task_id": task_id},
+                )
+    except Exception as e:
+        logger.error(f"Task {task_id} failed: {e}")
+        with get_db_session() as session:
+            with session.begin():
+                session.execute(
+                    text("""
+                        UPDATE task
+                        SET completed = :now, status = 'FAILED', error = :error
+                        WHERE id = :task_id
+                        """),
+                    {"now": datetime.datetime.now(UTC), "task_id": task_id, 
"error": str(e)},
+                )
+
+
+def set_resource_limits() -> None:
+    """Set CPU and memory limits for this process."""
+    # Set CPU time limit
+    try:
+        resource.setrlimit(resource.RLIMIT_CPU, (CPU_LIMIT_SECONDS, 
CPU_LIMIT_SECONDS))
+        logger.info(f"Set CPU time limit to {CPU_LIMIT_SECONDS} seconds")
+    except ValueError as e:
+        logger.warning(f"Could not set CPU time limit: {e}")
+
+    # Set memory limit
+    try:
+        resource.setrlimit(resource.RLIMIT_AS, (MEMORY_LIMIT_BYTES, 
MEMORY_LIMIT_BYTES))
+        logger.info(f"Set memory limit to {MEMORY_LIMIT_BYTES} bytes")
+    except ValueError as e:
+        logger.warning(f"Could not set memory limit: {e}")
+
+
+def worker_loop() -> NoReturn:
+    """Main worker loop."""
+    logger.info(f"Worker starting (PID: {os.getpid()})")
+
+    while True:
+        try:
+            task = claim_next_task()
+            if task:
+                task_id, task_type, task_args = task
+                process_task(task_id, task_type, task_args)
+            else:
+                # No tasks available, wait 20ms before checking again
+                time.sleep(0.02)
+        except Exception as e:
+            # TODO: Should probably be more robust about this
+            logger.error(f"Worker loop error: {e}")
+            time.sleep(1)
+
+
+def signal_handler(signum: int, frame: object) -> None:
+    """Handle termination signals gracefully."""
+    # For RLIMIT_AS we'll generally get a SIGKILL
+    # For RLIMIT_CPU we'll get a SIGXCPU, which we can catch
+    logger.info(f"Received signal {signum}, shutting down...")
+    sys.exit(0)
+
+
+def main() -> None:
+    """Main entry point."""
+    signal.signal(signal.SIGTERM, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)
+
+    set_resource_limits()
+    worker_loop()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/migrations/versions/b8dd95b83501_initial_schema.py 
b/migrations/versions/5d63e3dd5e93_initial_schema.py
similarity index 84%
rename from migrations/versions/b8dd95b83501_initial_schema.py
rename to migrations/versions/5d63e3dd5e93_initial_schema.py
index bc0deed..104e000 100644
--- a/migrations/versions/b8dd95b83501_initial_schema.py
+++ b/migrations/versions/5d63e3dd5e93_initial_schema.py
@@ -1,15 +1,15 @@
 """initial_schema
 
-Revision ID: b8dd95b83501
+Revision ID: 5d63e3dd5e93
 Revises:
-Create Date: 2025-02-19 20:20:11.349128
+Create Date: 2025-02-19 20:56:26.175412
 
 """
 
 from collections.abc import Sequence
 
 # revision identifiers, used by Alembic.
-revision: str = "b8dd95b83501"
+revision: str = "5d63e3dd5e93"
 down_revision: str | None = None
 branch_labels: str | Sequence[str] | None = None
 depends_on: str | Sequence[str] | None = None


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tooling.apache.org
For additional commands, e-mail: dev-h...@tooling.apache.org

Reply via email to