This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch pending_dist_changes
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 8ed69eb50c6941792f66447bf63cdebcfdff9a3e
Author: Alastair McFarlane <[email protected]>
AuthorDate: Thu Jan 29 15:04:32 2026 +0000

    #216 - Scheduled task for pending distributions, add created_by to dist 
table.
---
 atr/get/distribution.py                         |  6 ++++-
 atr/models/sql.py                               |  1 +
 atr/server.py                                   |  3 +++
 atr/storage/writers/distributions.py            |  2 ++
 atr/tasks/__init__.py                           |  1 +
 atr/tasks/distribution.py                       | 28 +++++++++++++++------
 migrations/versions/0043_2026.01.29_d7d89670.py | 33 +++++++++++++++++++++++++
 7 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/atr/get/distribution.py b/atr/get/distribution.py
index 7bc9162..4bbf922 100644
--- a/atr/get/distribution.py
+++ b/atr/get/distribution.py
@@ -17,6 +17,7 @@
 from collections.abc import Sequence
 
 import asfquart.base as base
+import htpy
 
 import atr.blueprints.get as get
 import atr.db as db
@@ -77,12 +78,15 @@ async def list_get(session: web.Committer, project_name: 
str, version_name: str)
     ## Distributions
     block.h2["Distributions"]
     for dist in distributions:
+        title_extra = []
+        if dist.pending:
+            title_extra.append(htpy.small(".text-muted")[" (this distribution 
is being verified by ATR)"])
         ### Platform package version
         block.h3(
             # Cannot use "#id" here, because the ID contains "."
             # If an ID contains ".", htm parses that as a class
             id=f"distribution-{dist.identifier}"
-        )[dist.title]
+        )[dist.title, *title_extra]
         tbody = htm.tbody[
             shared.distribution.html_tr("Release name", dist.release_name),
             shared.distribution.html_tr("Platform", dist.platform.value.name),
diff --git a/atr/models/sql.py b/atr/models/sql.py
index e830083..0b8d8a4 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -976,6 +976,7 @@ class Distribution(sqlmodel.SQLModel, table=True):
     upload_date: datetime.datetime | None = sqlmodel.Field(default=None)
     api_url: str | None = sqlmodel.Field(default=None)
     web_url: str | None = sqlmodel.Field(default=None)
+    created_by: str | None = sqlmodel.Field(default=None)
     # The API response can be huge, e.g. from npm
     # So we do not store it in the database
     # api_response: Any = 
sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON))
diff --git a/atr/server.py b/atr/server.py
index 1ef0d60..70438d1 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -802,6 +802,9 @@ async def _register_recurrent_tasks() -> None:
         await asyncio.sleep(60)
         workflow = await tasks.workflow_update(asf_uid="system", 
schedule_next=True)
         log.info(f"Scheduled workflow status update with ID {workflow.id}")
+        await asyncio.sleep(60)
+        dist_check = await tasks.distribution_status_check(asf_uid="system", 
schedule_next=True)
+        log.info(f"Scheduled distribution status update with ID 
{dist_check.id}")
 
     except Exception as e:
         log.exception(f"Failed to schedule recurrent tasks: {e!s}")
diff --git a/atr/storage/writers/distributions.py 
b/atr/storage/writers/distributions.py
index 312a471..7a4b683 100644
--- a/atr/storage/writers/distributions.py
+++ b/atr/storage/writers/distributions.py
@@ -160,6 +160,7 @@ class CommitteeMember(CommitteeParticipant):
             upload_date=upload_date,
             api_url=api_url,
             web_url=web_url,
+            created_by=self.__asf_uid,
         )
         if existing is None:
             self.__data.add(dist)
@@ -271,6 +272,7 @@ class CommitteeMember(CommitteeParticipant):
             existing.upload_date = upload_date
             existing.api_url = api_url
             existing.web_url = web_url
+            existing.created_by = self.__asf_uid
             await self.__data.commit()
             return existing
         return None
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index ea613bb..8ba7fd3 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -71,6 +71,7 @@ async def clear_scheduled(caller_data: db.Session | None = 
None) -> None:
                 [
                     sql.TaskType.METADATA_UPDATE,
                     sql.TaskType.WORKFLOW_STATUS,
+                    sql.TaskType.DISTRIBUTION_STATUS,
                 ]
             ),
             via(sql.Task.status) == sql.TaskStatus.QUEUED,
diff --git a/atr/tasks/distribution.py b/atr/tasks/distribution.py
index 541f918..36caabf 100644
--- a/atr/tasks/distribution.py
+++ b/atr/tasks/distribution.py
@@ -39,10 +39,12 @@ class DistributionStatusCheckArgs(schema.Strict):
 
 @checks.with_model(DistributionStatusCheckArgs)
 async def status_check(args: DistributionStatusCheckArgs, *, task_id: int | 
None = None) -> results.Results | None:
+    log.info("Checking pending recorded distributions")
     dists = []
     async with db.session() as data:
         dists = await data.distribution(pending=True, _with_release=True, 
_with_release_project=True).all()
     for dist in dists:
+        name = f"{dist.platform} {dist.owner_namespace} {dist.package} 
{dist.version}"
         dd = distribution.Data(
             platform=dist.platform,
             owner_namespace=dist.owner_namespace,
@@ -50,30 +52,40 @@ async def status_check(args: DistributionStatusCheckArgs, 
*, task_id: int | None
             version=dist.version,
             details=False,
         )
+        if not dist.created_by:
+            log.warning(f"Distribution {name} has no creator, skipping")
+            continue
+        if not dist.release.project.committee_name:
+            log.warning(f"Distribution {name} has no committee, skipping")
+            continue
         try:
-            async with 
storage.write_as_committee_member(str(dist.release.project.committee_name), 
args.asf_uid) as w:
+            async with 
storage.write_as_committee_member(dist.release.project.committee_name, 
dist.created_by) as w:
                 if dist.retries >= _RETRY_LIMIT:
                     await w.distributions.delete_distribution(
                         dist.release_name, dist.platform, 
dist.owner_namespace, dist.package, dist.version
                     )
-                    name = f"{dist.platform} {dist.owner_namespace} 
{dist.package} {dist.version}"
                     log.error(f"Distribution {name} failed {_RETRY_LIMIT} 
times, skipping")
                     continue
+                log.warning(f"Retrying distribution {name}")
                 await w.distributions.record_from_data(
                     dist.release_name,
                     dist.staging,
                     dd,
+                    allow_retries=True,
                 )
         except storage.AccessError as e:
             msg = f"Failed to record distribution: {e}"
             log.error(msg)
             raise RuntimeError(msg)
-    if args.next_schedule_seconds:
-        next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(seconds=args.next_schedule_seconds)
-        await tasks.distribution_status_check(args.asf_uid, 
schedule=next_schedule, schedule_next=True)
-        log.info(
-            f"Scheduled next workflow status update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
-        )
+        finally:
+            if args.next_schedule_seconds:
+                next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(
+                    seconds=args.next_schedule_seconds
+                )
+                await tasks.distribution_status_check(args.asf_uid, 
schedule=next_schedule, schedule_next=True)
+                log.info(
+                    f"Scheduled next workflow status update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
+                )
     return results.DistributionStatusCheck(
         kind="distribution_status",
     )
diff --git a/migrations/versions/0043_2026.01.29_d7d89670.py 
b/migrations/versions/0043_2026.01.29_d7d89670.py
new file mode 100644
index 0000000..8342377
--- /dev/null
+++ b/migrations/versions/0043_2026.01.29_d7d89670.py
@@ -0,0 +1,33 @@
+"""column for distribution creator
+
+Revision ID: 0043_2026.01.29_d7d89670
+Revises: 0042_2026.01.28_3e434625
+Create Date: 2026-01-29 13:56:34.371692+00:00
+"""
+
+from collections.abc import Sequence
+
+import sqlalchemy as sa
+from alembic import op
+
+# Revision identifiers, used by Alembic
+revision: str = "0043_2026.01.29_d7d89670"
+down_revision: str | None = "0042_2026.01.28_3e434625"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+    # ### commands auto generated by Alembic - please adjust! ###
+    with op.batch_alter_table("distribution", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("created_by", sa.String(), 
nullable=True))
+
+    # ### end Alembic commands ###
+
+
+def downgrade() -> None:
+    # ### commands auto generated by Alembic - please adjust! ###
+    with op.batch_alter_table("distribution", schema=None) as batch_op:
+        batch_op.drop_column("created_by")
+
+    # ### end Alembic commands ###


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to