This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch arm
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 033d5ab1a0518a681ee0fb41be4a72ce31a46efe
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Mar 3 15:01:34 2026 +0000

    Add taint tracking types to API routes
---
 atr/api/__init__.py      | 511 ++++++++++++++++++++++++++++++++---------------
 atr/blueprints/api.py    | 127 +++++++++++-
 atr/blueprints/common.py | 109 +++++++++-
 3 files changed, 578 insertions(+), 169 deletions(-)

diff --git a/atr/api/__init__.py b/atr/api/__init__.py
index 168d9fec..3c453899 100644
--- a/atr/api/__init__.py
+++ b/atr/api/__init__.py
@@ -37,6 +37,7 @@ import atr.hashes as hashes
 import atr.jwtoken as jwtoken
 import atr.log as log
 import atr.models as models
+import atr.models.safe as safe
 import atr.models.sql as sql
 import atr.paths as paths
 import atr.principal as principal
@@ -59,10 +60,16 @@ type DictResponse = tuple[dict[str, Any], int]
 ROUTES_MODULE: Final[Literal[True]] = True
 
 
[email protected]("/checks/list/<project>/<version>")
[email protected]
 @quart_schema.validate_response(models.api.ChecksListResults, 200)
-async def checks_list(project: str, version: str) -> DictResponse:
+async def checks_list(
+    _checks_list: Literal["checks/list"],
+    project_name: safe.ProjectName,
+    version_name: safe.VersionName,
+) -> DictResponse:
     """
+    URL: GET /checks/list/<project_name>/<version_name>
+
     List checks by project and version.
 
     Checks are only conducted during the compose a draft phase. This endpoint
@@ -74,10 +81,8 @@ async def checks_list(project: str, version: str) -> 
DictResponse:
     may potentially be thousands or results or more.
     """
     # TODO: We should perhaps paginate this
-    _simple_check(project, version)
-
     async with db.session() as data:
-        release_name = sql.release_name(project, version)
+        release_name = sql.release_name(str(project_name), str(version_name))
         release = await 
data.release(name=release_name).demand(exceptions.NotFound(f"Release 
{release_name} not found"))
         check_results = await interaction.checks_for(release, caller_data=data)
 
@@ -89,10 +94,17 @@ async def checks_list(project: str, version: str) -> 
DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/checks/list/<project>/<version>/<revision>")
[email protected]
 @quart_schema.validate_response(models.api.ChecksListResults, 200)
-async def checks_list_revision(project: str, version: str, revision: str) -> 
DictResponse:
+async def checks_list_revision(
+    _checks_list: Literal["checks/list"],
+    project_name: safe.ProjectName,
+    version_name: safe.VersionName,
+    revision: str,
+) -> DictResponse:
     """
+    URL: GET /checks/list/<project_name>/<version_name>/<revision>
+
     List checks by project, version, and revision.
 
     Checks are only conducted during the compose a draft phase. This endpoint
@@ -103,20 +115,15 @@ async def checks_list_revision(project: str, version: 
str, revision: str) -> Dic
     Warning: the check results include results for archive members, so there
     may potentially be thousands or results or more.
     """
-    _simple_check(project, version, revision)
     async with db.session() as data:
-        project_result = await data.project(name=project).get()
-        if project_result is None:
-            raise exceptions.NotFound(f"Project '{project}' does not exist")
-
-        release_name = sql.release_name(project, version)
-        release_result = await data.release(name=release_name).get()
-        if release_result is None:
-            raise exceptions.NotFound(f"Release '{project}-{version}' does not 
exist")
+        release_name = sql.release_name(str(project_name), str(version_name))
+        release_result = await data.release(name=release_name).demand(
+            exceptions.NotFound(f"Release '{release_name}' does not exist")
+        )
 
         revision_result = await data.revision(release_name=release_name, 
number=revision).get()
         if revision_result is None:
-            raise exceptions.NotFound(f"Revision '{revision}' does not exist 
for release '{project}-{version}'")
+            raise exceptions.NotFound(f"Revision '{revision}' does not exist 
for release '{release_name}'")
 
         check_results = await interaction.checks_for(release_result, 
revision=revision, caller_data=data)
 
@@ -128,15 +135,17 @@ async def checks_list_revision(project: str, version: 
str, revision: str) -> Dic
     ).model_dump(), 200
 
 
[email protected]("/checks/ongoing/<project>/<version>", defaults={"revision": None})
[email protected]("/checks/ongoing/<project>/<version>/<revision>")
[email protected]
 @quart_schema.validate_response(models.api.ChecksOngoingResults, 200)
 async def checks_ongoing(
-    project: str,
-    version: str,
+    _checks_ongoing: Literal["checks/ongoing"],
+    project_name: safe.ProjectName,
+    version_name: safe.VersionName,
     revision: str | None = None,
 ) -> DictResponse:
     """
+    URL: GET /checks/ongoing/<project_name>/<version_name>[/<revision>]
+
     Count ongoing checks by project, version, and optionally revision.
 
     Checks are only conducted during the compose a draft phase. This endpoint
@@ -144,8 +153,9 @@ async def checks_ongoing(
     present, or the most recent draft revision otherwise. A draft release
     cannot be promoted to the vote phase if checks are still ongoing.
     """
-    _simple_check(project, version, revision)
-    ongoing_tasks_count, _latest_revision = await 
interaction.tasks_ongoing_revision(project, version, revision)
+    ongoing_tasks_count, _latest_revision = await 
interaction.tasks_ongoing_revision(
+        str(project_name), str(version_name), revision
+    )
     # TODO: Is there a way to return just an int?
     # The ResponseReturnValue type in quart does not allow int
     # And if we use quart.jsonify, we must return web.QuartResponse which 
quart_schema tries to validate
@@ -165,10 +175,15 @@ async def checks_ongoing(
     ).model_dump(), 200
 
 
[email protected]("/committee/get/<name>")
[email protected]
 @quart_schema.validate_response(models.api.CommitteeGetResults, 200)
-async def committee_get(name: str) -> DictResponse:
+async def committee_get(
+    _committee_get: Literal["committee/get"],
+    name: str,
+) -> DictResponse:
     """
+    URL: GET /committee/get/<name>
+
     Get a committee by name.
 
     The name of the committee is the name without any prefixes or suffixes such
@@ -176,7 +191,6 @@ async def committee_get(name: str) -> DictResponse:
     The Apache Simple Example PMC, for example, would have the name
     "simple-example".
     """
-    _simple_check(name)
     async with db.session() as data:
         committee = await 
data.committee(name=name).demand(exceptions.NotFound(f"Committee '{name}' was 
not found"))
     return models.api.CommitteeGetResults(
@@ -185,10 +199,15 @@ async def committee_get(name: str) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/committee/keys/<name>")
[email protected]
 @quart_schema.validate_response(models.api.CommitteeKeysResults, 200)
-async def committee_keys(name: str) -> DictResponse:
+async def committee_keys(
+    _committee_keys: Literal["committee/keys"],
+    name: str,
+) -> DictResponse:
     """
+    URL: GET /committee/keys/<name>
+
     List public OpenPGP keys by committee name.
 
     The name of the committee is the name without any prefixes or suffixes such
@@ -196,7 +215,6 @@ async def committee_keys(name: str) -> DictResponse:
     The Apache Simple Example PMC, for example, would have the name
     "simple-example".
     """
-    _simple_check(name)
     async with db.session() as data:
         committee = await data.committee(name=name, 
_public_signing_keys=True).demand(
             exceptions.NotFound(f"Committee '{name}' was not found")
@@ -207,10 +225,15 @@ async def committee_keys(name: str) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/committee/projects/<name>")
[email protected]
 @quart_schema.validate_response(models.api.CommitteeProjectsResults, 200)
-async def committee_projects(name: str) -> DictResponse:
+async def committee_projects(
+    _committee_projects: Literal["committee/projects"],
+    name: str,
+) -> DictResponse:
     """
+    URL: GET /committee/projects/<name>
+
     List projects by committee name.
 
     The name of the committee is the name without any prefixes or suffixes such
@@ -218,7 +241,6 @@ async def committee_projects(name: str) -> DictResponse:
     The Apache Simple Example PMC, for example, would have the name
     "simple-example".
     """
-    _simple_check(name)
     async with db.session() as data:
         committee = await data.committee(name=name, _projects=True).demand(
             exceptions.NotFound(f"Committee '{name}' was not found")
@@ -229,10 +251,14 @@ async def committee_projects(name: str) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/committees/list")
[email protected]
 @quart_schema.validate_response(models.api.CommitteesListResults, 200)
-async def committees_list() -> DictResponse:
+async def committees_list(
+    _committees_list: Literal["committees/list"],
+) -> DictResponse:
     """
+    URL: GET /committees/list
+
     List committees.
 
     The list of committees is returned in no particular order.
@@ -245,11 +271,15 @@ async def committees_list() -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/distribute/ssh/register", methods=["POST"])
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
-@quart_schema.validate_request(models.api.DistributeSshRegisterArgs)
-async def distribute_ssh_register(data: models.api.DistributeSshRegisterArgs) 
-> DictResponse:
+async def distribute_ssh_register(
+    _distribute_ssh_register: Literal["distribute/ssh/register"],
+    data: models.api.DistributeSshRegisterArgs,
+) -> DictResponse:
     """
+    URL: POST /distribute/ssh/register
+
     Register an SSH key sent with a corroborating Trusted Publisher JWT,
     validating the requested release is in the correct phase.
     """
@@ -278,13 +308,17 @@ async def distribute_ssh_register(data: 
models.api.DistributeSshRegisterArgs) ->
     ).model_dump(), 200
 
 
[email protected]("/distribution/record", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.DistributionRecordArgs)
 @quart_schema.validate_response(models.api.DistributionRecordResults, 200)
-async def distribution_record(data: models.api.DistributionRecordArgs) -> 
DictResponse:
+async def distribution_record(
+    _distribution_record: Literal["distribution/record"],
+    data: models.api.DistributionRecordArgs,
+) -> DictResponse:
     """
+    URL: POST /distribution/record
+
     Record a manual distribution.
     """
     asf_uid = _jwt_asf_uid()
@@ -319,10 +353,14 @@ async def distribution_record(data: 
models.api.DistributionRecordArgs) -> DictRe
     ).model_dump(), 200
 
 
[email protected]("/distribute/record_from_workflow", methods=["POST"])
-@quart_schema.validate_request(models.api.DistributionRecordFromWorkflowArgs)
-async def distribution_record_from_workflow(data: 
models.api.DistributionRecordFromWorkflowArgs) -> DictResponse:
[email protected]
+async def distribution_record_from_workflow(
+    _distribute_record_from_workflow: 
Literal["distribute/record_from_workflow"],
+    data: models.api.DistributionRecordFromWorkflowArgs,
+) -> DictResponse:
     """
+    URL: POST /distribute/record_from_workflow
+
     Record the result of an automated distribution from the GH tooling-actions 
workflow.
     """
     _payload, asf_uid, _project, release = await 
interaction.trusted_jwt_for_dist(
@@ -356,13 +394,17 @@ async def distribution_record_from_workflow(data: 
models.api.DistributionRecordF
     ).model_dump(), 200
 
 
[email protected]("/ignore/add", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.IgnoreAddArgs)
 @quart_schema.validate_response(models.api.IgnoreAddResults, 200)
-async def ignore_add(data: models.api.IgnoreAddArgs) -> DictResponse:
+async def ignore_add(
+    _ignore_add: Literal["ignore/add"],
+    data: models.api.IgnoreAddArgs,
+) -> DictResponse:
     """
+    URL: POST /ignore/add
+
     Add a check ignore.
     """
     asf_uid = _jwt_asf_uid()
@@ -386,13 +428,17 @@ async def ignore_add(data: models.api.IgnoreAddArgs) -> 
DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/ignore/delete", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.IgnoreDeleteArgs)
 @quart_schema.validate_response(models.api.IgnoreDeleteResults, 200)
-async def ignore_delete(data: models.api.IgnoreDeleteArgs) -> DictResponse:
+async def ignore_delete(
+    _ignore_delete: Literal["ignore/delete"],
+    data: models.api.IgnoreDeleteArgs,
+) -> DictResponse:
     """
+    URL: POST /ignore/delete
+
     Delete a check ignore.
     """
     asf_uid = _jwt_asf_uid()
@@ -410,27 +456,35 @@ async def ignore_delete(data: 
models.api.IgnoreDeleteArgs) -> DictResponse:
 
 
 # TODO: Rename to ignores
[email protected]("/ignore/list/<project_name>")
[email protected]
 @quart_schema.validate_response(models.api.IgnoreListResults, 200)
-async def ignore_list(project_name: str) -> DictResponse:
+async def ignore_list(
+    _ignore_list: Literal["ignore/list"],
+    project_name: safe.ProjectName,
+) -> DictResponse:
     """
+    URL: GET /ignore/list/<project_name>
+
     List ignores by project name.
     """
-    _simple_check(project_name)
     async with db.session() as data:
-        await data.project(name=project_name).demand(exceptions.NotFound())
-        ignores = await 
data.check_result_ignore(project_name=project_name).all()
+        await 
data.project(name=str(project_name)).demand(exceptions.NotFound())
+        ignores = await 
data.check_result_ignore(project_name=str(project_name)).all()
     return models.api.IgnoreListResults(
         endpoint="/ignore/list",
         ignores=ignores,
     ).model_dump(), 200
 
 
[email protected]("/jwt/create", methods=["POST"])
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
-@quart_schema.validate_request(models.api.JwtCreateArgs)
-async def jwt_create(data: models.api.JwtCreateArgs) -> DictResponse:
+async def jwt_create(
+    _jwt_create: Literal["jwt/create"],
+    data: models.api.JwtCreateArgs,
+) -> DictResponse:
     """
+    URL: POST /jwt/create
+
     Create a JWT.
 
     The payload must include a valid PAT.
@@ -450,14 +504,18 @@ async def jwt_create(data: models.api.JwtCreateArgs) -> 
DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/key/add", methods=["POST"])
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.KeyAddArgs)
 @quart_schema.validate_response(models.api.KeyAddResults, 200)
-async def key_add(data: models.api.KeyAddArgs) -> DictResponse:
+async def key_add(
+    _key_add: Literal["key/add"],
+    data: models.api.KeyAddArgs,
+) -> DictResponse:
     """
+    URL: POST /key/add
+
     Add a public OpenPGP key.
 
     Once associated with the specified committees, the key will appear in the
@@ -484,13 +542,17 @@ async def key_add(data: models.api.KeyAddArgs) -> 
DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/key/delete", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.KeyDeleteArgs)
 @quart_schema.validate_response(models.api.KeyDeleteResults, 200)
-async def key_delete(data: models.api.KeyDeleteArgs) -> DictResponse:
+async def key_delete(
+    _key_delete: Literal["key/delete"],
+    data: models.api.KeyDeleteArgs,
+) -> DictResponse:
     """
+    URL: POST /key/delete
+
     Delete a public OpenPGP key.
 
     Warning: we plan to change how key deletion works.
@@ -517,16 +579,20 @@ async def key_delete(data: models.api.KeyDeleteArgs) -> 
DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/key/get/<fingerprint>")
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @quart_schema.validate_response(models.api.KeyGetResults, 200)
-async def key_get(fingerprint: str) -> DictResponse:
+async def key_get(
+    _key_get: Literal["key/get"],
+    fingerprint: str,
+) -> DictResponse:
     """
+    URL: GET /key/get/<fingerprint>
+
     Get a public OpenPGP key by fingerprint.
 
     All public OpenPGP keys stored within the database are accessible.
     """
-    _simple_check(fingerprint)
     async with db.session() as data:
         key = await 
data.public_signing_key(fingerprint=fingerprint.lower()).demand(
             exceptions.NotFound(f"Key '{fingerprint}' not found")
@@ -537,14 +603,18 @@ async def key_get(fingerprint: str) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/keys/upload", methods=["POST"])
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.KeysUploadArgs)
 @quart_schema.validate_response(models.api.KeysUploadResults, 200)
-async def keys_upload(data: models.api.KeysUploadArgs) -> DictResponse:
+async def keys_upload(
+    _keys_upload: Literal["keys/upload"],
+    data: models.api.KeysUploadArgs,
+) -> DictResponse:
     """
+    URL: POST /keys/upload
+
     Upload a public OpenPGP KEYS file.
     """
     asf_uid = _jwt_asf_uid()
@@ -595,14 +665,18 @@ async def keys_upload(data: models.api.KeysUploadArgs) -> 
DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/keys/user/<asf_uid>")
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @quart_schema.validate_response(models.api.KeysUserResults, 200)
-async def keys_user(asf_uid: str) -> DictResponse:
+async def keys_user(
+    _keys_user: Literal["keys/user"],
+    asf_uid: str,
+) -> DictResponse:
     """
+    URL: GET /keys/user/<asf_uid>
+
     List public OpenPGP keys by the ASF UID of a user.
     """
-    _simple_check(asf_uid)
     async with db.session() as data:
         keys = await data.public_signing_key(apache_uid=asf_uid).all()
     return models.api.KeysUserResults(
@@ -611,33 +685,43 @@ async def keys_user(asf_uid: str) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/project/get/<name>")
[email protected]
 @quart_schema.validate_response(models.api.ProjectGetResults, 200)
-async def project_get(name: str) -> DictResponse:
+async def project_get(
+    _project_get: Literal["project/get"],
+    project_name: safe.ProjectName,
+) -> DictResponse:
     """
+    URL: GET /project/get/<project_name>
+
     Get a project by name.
     """
-    _simple_check(name)
     async with db.session() as data:
-        project = await data.project(name=name).demand(exceptions.NotFound())
+        project = await 
data.project(name=str(project_name)).demand(exceptions.NotFound())
     return models.api.ProjectGetResults(
         endpoint="/project/get",
         project=project,
     ).model_dump(), 200
 
 
[email protected]("/project/policy/<name>")
[email protected]
 @quart_schema.validate_response(models.api.ProjectPolicyResults, 200)
-async def project_policy(name: str) -> DictResponse:
+async def project_policy(
+    _project_policy: Literal["project/policy"],
+    project_name: safe.ProjectName,
+) -> DictResponse:
     """
+    URL: GET /project/policy/<project_name>
+
     Get project policy by name.
 
     Returns the release policy settings for a project.
     If no policy has been configured, defaults are returned.
     """
-    _simple_check(name)
     async with db.session() as data:
-        project = await data.project(name=name, _release_policy=True, 
_committee=True).demand(exceptions.NotFound())
+        project = await data.project(name=str(project_name), 
_release_policy=True, _committee=True).demand(
+            exceptions.NotFound()
+        )
     return models.api.ProjectPolicyResults(
         endpoint="/project/policy",
         project_name=project.name,
@@ -664,26 +748,34 @@ async def project_policy(name: str) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/project/releases/<name>")
[email protected]
 @quart_schema.validate_response(models.api.ProjectReleasesResults, 200)
-async def project_releases(name: str) -> DictResponse:
+async def project_releases(
+    _project_releases: Literal["project/releases"],
+    project_name: safe.ProjectName,
+) -> DictResponse:
     """
+    URL: GET /project/releases/<project_name>
+
     List releases by project name.
     """
-    _simple_check(name)
     async with db.session() as data:
-        await data.project(name=name).demand(exceptions.NotFound())
-        releases = await data.release(project_name=name).all()
+        await 
data.project(name=str(project_name)).demand(exceptions.NotFound())
+        releases = await data.release(project_name=str(project_name)).all()
     return models.api.ProjectReleasesResults(
         endpoint="/project/releases",
         releases=releases,
     ).model_dump(), 200
 
 
[email protected]("/projects/list")
[email protected]
 @quart_schema.validate_response(models.api.ProjectsListResults, 200)
-async def projects_list() -> DictResponse:
+async def projects_list(
+    _projects_list: Literal["projects/list"],
+) -> DictResponse:
     """
+    URL: GET /projects/list
+
     List projects.
     """
     # TODO: Add pagination?
@@ -695,10 +787,14 @@ async def projects_list() -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/publisher/distribution/record", methods=["POST"])
-@quart_schema.validate_request(models.api.PublisherDistributionRecordArgs)
-async def publisher_distribution_record(data: 
models.api.PublisherDistributionRecordArgs) -> DictResponse:
[email protected]
+async def publisher_distribution_record(
+    _publisher_distribution_record: Literal["publisher/distribution/record"],
+    data: models.api.PublisherDistributionRecordArgs,
+) -> DictResponse:
     """
+    URL: POST /publisher/distribution/record
+
     Record a distribution with a corroborating Trusted Publisher JWT.
     """
     try:
@@ -743,10 +839,14 @@ async def publisher_distribution_record(data: 
models.api.PublisherDistributionRe
     ).model_dump(), 200
 
 
[email protected]("/publisher/release/announce", methods=["POST"])
-@quart_schema.validate_request(models.api.PublisherReleaseAnnounceArgs)
-async def publisher_release_announce(data: 
models.api.PublisherReleaseAnnounceArgs) -> DictResponse:
[email protected]
+async def publisher_release_announce(
+    _publisher_release_announce: Literal["publisher/release/announce"],
+    data: models.api.PublisherReleaseAnnounceArgs,
+) -> DictResponse:
     """
+    URL: POST /publisher/release/announce
+
     Announce a release with a corroborating Trusted Publisher JWT.
     """
     _payload, asf_uid, project = await interaction.trusted_jwt(
@@ -777,11 +877,15 @@ async def publisher_release_announce(data: 
models.api.PublisherReleaseAnnounceAr
     ).model_dump(), 200
 
 
[email protected]("/publisher/ssh/register", methods=["POST"])
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
-@quart_schema.validate_request(models.api.PublisherSshRegisterArgs)
-async def publisher_ssh_register(data: models.api.PublisherSshRegisterArgs) -> 
DictResponse:
+async def publisher_ssh_register(
+    _publisher_ssh_register: Literal["publisher/ssh/register"],
+    data: models.api.PublisherSshRegisterArgs,
+) -> DictResponse:
     """
+    URL: POST /publisher/ssh/register
+
     Register an SSH key sent with a corroborating Trusted Publisher JWT.
     """
     payload, asf_uid, project = await interaction.trusted_jwt(
@@ -804,10 +908,14 @@ async def publisher_ssh_register(data: 
models.api.PublisherSshRegisterArgs) -> D
     ).model_dump(), 200
 
 
[email protected]("/publisher/vote/resolve", methods=["POST"])
-@quart_schema.validate_request(models.api.PublisherVoteResolveArgs)
-async def publisher_vote_resolve(data: models.api.PublisherVoteResolveArgs) -> 
DictResponse:
[email protected]
+async def publisher_vote_resolve(
+    _publisher_vote_resolve: Literal["publisher/vote/resolve"],
+    data: models.api.PublisherVoteResolveArgs,
+) -> DictResponse:
     """
+    URL: POST /publisher/vote/resolve
+
     Resolve a vote with a corroborating Trusted Publisher JWT.
     """
     # TODO: Need to be able to resolve and make the release immutable
@@ -833,13 +941,17 @@ async def publisher_vote_resolve(data: 
models.api.PublisherVoteResolveArgs) -> D
     ).model_dump(), 200
 
 
[email protected]("/release/announce", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.ReleaseAnnounceArgs)
 @quart_schema.validate_response(models.api.ReleaseAnnounceResults, 201)
-async def release_announce(data: models.api.ReleaseAnnounceArgs) -> 
DictResponse:
+async def release_announce(
+    _release_announce: Literal["release/announce"],
+    data: models.api.ReleaseAnnounceArgs,
+) -> DictResponse:
     """
+    URL: POST /release/announce
+
     Announce a release.
 
     After a vote on a release has passed, if everything is in order and all
@@ -871,13 +983,17 @@ async def release_announce(data: 
models.api.ReleaseAnnounceArgs) -> DictResponse
     ).model_dump(), 201
 
 
[email protected]("/release/create", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.ReleaseCreateArgs)
 @quart_schema.validate_response(models.api.ReleaseCreateResults, 201)
-async def release_create(data: models.api.ReleaseCreateArgs) -> DictResponse:
+async def release_create(
+    _release_create: Literal["release/create"],
+    data: models.api.ReleaseCreateArgs,
+) -> DictResponse:
     """
+    URL: POST /release/create
+
     Create a release.
 
     Release are created as a draft, which must be composed.
@@ -898,13 +1014,17 @@ async def release_create(data: 
models.api.ReleaseCreateArgs) -> DictResponse:
 
 
 # TODO: Duplicates the below
[email protected]("/release/delete", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.ReleaseDeleteArgs)
 @quart_schema.validate_response(models.api.ReleaseDeleteResults, 200)
-async def release_delete(data: models.api.ReleaseDeleteArgs) -> DictResponse:
+async def release_delete(
+    _release_delete: Literal["release/delete"],
+    data: models.api.ReleaseDeleteArgs,
+) -> DictResponse:
     """
+    URL: POST /release/delete
+
     Delete a release.
     """
     asf_uid = _jwt_asf_uid()
@@ -923,15 +1043,20 @@ async def release_delete(data: 
models.api.ReleaseDeleteArgs) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/release/get/<project>/<version>")
[email protected]
 @quart_schema.validate_response(models.api.ReleaseGetResults, 200)
-async def release_get(project: str, version: str) -> DictResponse:
+async def release_get(
+    _release_get: Literal["release/get"],
+    project_name: safe.ProjectName,
+    version_name: safe.VersionName,
+) -> DictResponse:
     """
+    URL: GET /release/get/<project_name>/<version_name>
+
     Get a release by project and version.
     """
-    _simple_check(project, version)
     async with db.session() as data:
-        release_name = sql.release_name(project, version)
+        release_name = sql.release_name(str(project_name), str(version_name))
         release = await 
data.release(name=release_name).demand(exceptions.NotFound())
     return models.api.ReleaseGetResults(
         endpoint="/release/get",
@@ -939,16 +1064,21 @@ async def release_get(project: str, version: str) -> 
DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/release/paths/<project>/<version>")
[email protected]("/release/paths/<project>/<version>/<revision>")
[email protected]
 @quart_schema.validate_response(models.api.ReleasePathsResults, 200)
-async def release_paths(project: str, version: str, revision: str | None = 
None) -> DictResponse:
+async def release_paths(
+    _release_paths: Literal["release/paths"],
+    project_name: safe.ProjectName,
+    version_name: safe.VersionName,
+    revision: str | None = None,
+) -> DictResponse:
     """
+    URL: GET /release/paths/<project_name>/<version_name>[/<revision>]
+
     List paths in a release by project and version.
     """
-    _simple_check(project, version, revision)
     async with db.session() as data:
-        release_name = sql.release_name(project, version)
+        release_name = sql.release_name(str(project_name), str(version_name))
         release = await 
data.release(name=release_name).demand(exceptions.NotFound())
         if revision is None:
             dir_path = paths.release_directory(release)
@@ -965,15 +1095,20 @@ async def release_paths(project: str, version: str, 
revision: str | None = None)
     ).model_dump(), 200
 
 
[email protected]("/release/revisions/<project>/<version>")
[email protected]
 @quart_schema.validate_response(models.api.ReleaseRevisionsResults, 200)
-async def release_revisions(project: str, version: str) -> DictResponse:
+async def release_revisions(
+    _release_revisions: Literal["release/revisions"],
+    project_name: safe.ProjectName,
+    version_name: safe.VersionName,
+) -> DictResponse:
     """
+    URL: GET /release/revisions/<project_name>/<version_name>
+
     List revisions by project and version.
     """
-    _simple_check(project, version)
     async with db.session() as data:
-        release_name = sql.release_name(project, version)
+        release_name = sql.release_name(str(project_name), str(version_name))
         revisions = await data.revision(release_name=release_name).all()
     if not isinstance(revisions, list):
         revisions = list(revisions)
@@ -984,13 +1119,17 @@ async def release_revisions(project: str, version: str) 
-> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/release/upload", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.ReleaseUploadArgs)
 @quart_schema.validate_response(models.api.ReleaseUploadResults, 201)
-async def release_upload(data: models.api.ReleaseUploadArgs) -> DictResponse:
+async def release_upload(
+    _release_upload: Literal["release/upload"],
+    data: models.api.ReleaseUploadArgs,
+) -> DictResponse:
     """
+    URL: POST /release/upload
+
     Upload a file to a release.
     """
     asf_uid = _jwt_asf_uid()
@@ -1016,11 +1155,15 @@ async def release_upload(data: 
models.api.ReleaseUploadArgs) -> DictResponse:
     ).model_dump(), 201
 
 
[email protected]("/releases/list")
-@quart_schema.validate_querystring(models.api.ReleasesListQuery)
[email protected]
 @quart_schema.validate_response(models.api.ReleasesListResults, 200)
-async def releases_list(query_args: models.api.ReleasesListQuery) -> 
DictResponse:
+async def releases_list(
+    _releases_list: Literal["releases/list"],
+    query_args: models.api.ReleasesListQuery,
+) -> DictResponse:
     """
+    URL: GET /releases/list
+
     List releases.
 
     The list of releases is paged and can be filtered by phase.
@@ -1058,13 +1201,17 @@ async def releases_list(query_args: 
models.api.ReleasesListQuery) -> DictRespons
     ).model_dump(), 200
 
 
[email protected]("/signature/provenance", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.SignatureProvenanceArgs)
 @quart_schema.validate_response(models.api.SignatureProvenanceResults, 200)
-async def signature_provenance(data: models.api.SignatureProvenanceArgs) -> 
DictResponse:
+async def signature_provenance(
+    _signature_provenance: Literal["signature/provenance"],
+    data: models.api.SignatureProvenanceArgs,
+) -> DictResponse:
     """
+    URL: POST /signature/provenance
+
     Get the provenance of a signature.
     """
     # POST because this uses significant computation and I/O
@@ -1121,14 +1268,18 @@ async def signature_provenance(data: 
models.api.SignatureProvenanceArgs) -> Dict
     ).model_dump(), 200
 
 
[email protected]("/ssh-key/add", methods=["POST"])
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.SshKeyAddArgs)
 @quart_schema.validate_response(models.api.SshKeyAddResults, 201)
-async def ssh_key_add(data: models.api.SshKeyAddArgs) -> DictResponse:
+async def ssh_key_add(
+    _ssh_key_add: Literal["ssh-key/add"],
+    data: models.api.SshKeyAddArgs,
+) -> DictResponse:
     """
+    URL: POST /ssh-key/add
+
     Add an SSH key.
 
     An SSH key is associated with a single user.
@@ -1143,14 +1294,18 @@ async def ssh_key_add(data: models.api.SshKeyAddArgs) 
-> DictResponse:
     ).model_dump(), 201
 
 
[email protected]("/ssh-key/delete", methods=["POST"])
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.SshKeyDeleteArgs)
 @quart_schema.validate_response(models.api.SshKeyDeleteResults, 201)
-async def ssh_key_delete(data: models.api.SshKeyDeleteArgs) -> DictResponse:
+async def ssh_key_delete(
+    _ssh_key_delete: Literal["ssh-key/delete"],
+    data: models.api.SshKeyDeleteArgs,
+) -> DictResponse:
     """
+    URL: POST /ssh-key/delete
+
     Delete an SSH key.
 
     An SSH key can only be deleted by the user who owns it.
@@ -1165,14 +1320,18 @@ async def ssh_key_delete(data: 
models.api.SshKeyDeleteArgs) -> DictResponse:
     ).model_dump(), 201
 
 
[email protected]("/ssh-keys/list/<asf_uid>")
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
-@quart_schema.validate_querystring(models.api.SshKeysListQuery)
-async def ssh_keys_list(asf_uid: str, query_args: models.api.SshKeysListQuery) 
-> DictResponse:
+async def ssh_keys_list(
+    _ssh_keys_list: Literal["ssh-keys/list"],
+    asf_uid: str,
+    query_args: models.api.SshKeysListQuery,
+) -> DictResponse:
     """
+    URL: GET /ssh-keys/list/<asf_uid>
+
     List SSH keys by ASF UID.
     """
-    _simple_check(asf_uid)
     _pagination_args_validate(query_args)
     via = sql.validate_instrumented_attribute
     async with db.session() as data:
@@ -1195,10 +1354,14 @@ async def ssh_keys_list(asf_uid: str, query_args: 
models.api.SshKeysListQuery) -
     ).model_dump(), 200
 
 
[email protected]("/tasks/list")
-@quart_schema.validate_querystring(models.api.TasksListQuery)
-async def tasks_list(query_args: models.api.TasksListQuery) -> DictResponse:
[email protected]
+async def tasks_list(
+    _tasks_list: Literal["tasks/list"],
+    query_args: models.api.TasksListQuery,
+) -> DictResponse:
     """
+    URL: GET /tasks/list
+
     List tasks.
     """
     _pagination_args_validate(query_args)
@@ -1222,10 +1385,14 @@ async def tasks_list(query_args: 
models.api.TasksListQuery) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/distribute/task/status", methods=["POST"])
-@quart_schema.validate_request(models.api.DistributeStatusUpdateArgs)
-async def update_distribution_task_status(data: 
models.api.DistributeStatusUpdateArgs) -> DictResponse:
[email protected]
+async def update_distribution_task_status(
+    _distribute_task_status: Literal["distribute/task/status"],
+    data: models.api.DistributeStatusUpdateArgs,
+) -> DictResponse:
     """
+    URL: POST /distribute/task/status
+
     Update the status of a distribution task
     """
     _payload, _asf_uid = await 
interaction.validate_trusted_jwt(data.publisher, data.jwt)
@@ -1244,13 +1411,17 @@ async def update_distribution_task_status(data: 
models.api.DistributeStatusUpdat
     ).model_dump(), 200
 
 
[email protected]("/user/info")
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
 @quart_schema.validate_response(models.api.UserInfoResults, 200)
-async def user_info() -> DictResponse:
+async def user_info(
+    _user_info: Literal["user/info"],
+) -> DictResponse:
     """
+    URL: GET /user/info
+
     Get information about a user.
     """
     asf_uid = _jwt_asf_uid()
@@ -1264,11 +1435,15 @@ async def user_info() -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/users/list")
[email protected]
 @rate_limiter.rate_limit(10, datetime.timedelta(hours=1))
 @quart_schema.validate_response(models.api.UsersListResults, 200)
-async def users_list() -> DictResponse:
+async def users_list(
+    _users_list: Literal["users/list"],
+) -> DictResponse:
     """
+    URL: GET /users/list
+
     List known users.
 
     This is not a list of all ASF users, but only those known to ATR.
@@ -1303,13 +1478,17 @@ async def users_list() -> DictResponse:
 
 
 # TODO: Add endpoints to allow users to vote
[email protected]("/vote/resolve", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.VoteResolveArgs)
 @quart_schema.validate_response(models.api.VoteResolveResults, 200)
-async def vote_resolve(data: models.api.VoteResolveArgs) -> DictResponse:
+async def vote_resolve(
+    _vote_resolve: Literal["vote/resolve"],
+    data: models.api.VoteResolveArgs,
+) -> DictResponse:
     """
+    URL: POST /vote/resolve
+
     Resolve a vote.
 
     A vote can be resolved by passing or failing.
@@ -1338,13 +1517,17 @@ async def vote_resolve(data: 
models.api.VoteResolveArgs) -> DictResponse:
     ).model_dump(), 200
 
 
[email protected]("/vote/start", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.VoteStartArgs)
 @quart_schema.validate_response(models.api.VoteStartResults, 201)
-async def vote_start(data: models.api.VoteStartArgs) -> DictResponse:
+async def vote_start(
+    _vote_start: Literal["vote/start"],
+    data: models.api.VoteStartArgs,
+) -> DictResponse:
     """
+    URL: POST /vote/start
+
     Start a vote.
     """
     asf_uid = _jwt_asf_uid()
@@ -1381,13 +1564,17 @@ async def vote_start(data: models.api.VoteStartArgs) -> 
DictResponse:
     ).model_dump(), 201
 
 
[email protected]("/vote/tabulate", methods=["POST"])
[email protected]
 @jwtoken.require
 @quart_schema.security_scheme([{"BearerAuth": []}])
-@quart_schema.validate_request(models.api.VoteTabulateArgs)
 @quart_schema.validate_response(models.api.VoteTabulateResults, 200)
-async def vote_tabulate(data: models.api.VoteTabulateArgs) -> DictResponse:
+async def vote_tabulate(
+    _vote_tabulate: Literal["vote/tabulate"],
+    data: models.api.VoteTabulateArgs,
+) -> DictResponse:
     """
+    URL: POST /vote/tabulate
+
     Tabulate a vote.
     """
     # asf_uid = _jwt_asf_uid()
@@ -1491,9 +1678,3 @@ def _pagination_args_validate(query_args: Any) -> None:
             raise exceptions.BadRequest("Maximum offset of 1000000 exceeded")
         elif offset < 0:
             raise exceptions.BadRequest("Minimum offset less than 0 is 
nonsense")
-
-
-def _simple_check(*args: str | None) -> None:
-    for arg in args:
-        if arg == "None":
-            raise exceptions.BadRequest("Argument cannot be the string 'None'")
diff --git a/atr/blueprints/api.py b/atr/blueprints/api.py
index 3c84266f..1219b791 100644
--- a/atr/blueprints/api.py
+++ b/atr/blueprints/api.py
@@ -14,8 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import dataclasses
 import datetime
+import inspect
 import sys
+import time
+from collections.abc import Callable
 from types import ModuleType
 from typing import Any
 
@@ -27,16 +31,104 @@ import quart_rate_limiter as rate_limiter
 import quart_schema
 import werkzeug.exceptions as exceptions
 
-_BLUEPRINT = quart.Blueprint("api_blueprint", __name__, url_prefix="/api")
+import atr.blueprints.common as common
+import atr.log as log
 
-route = _BLUEPRINT.route
+_BLUEPRINT_NAME = "api_blueprint"
+_BLUEPRINT = quart.Blueprint(_BLUEPRINT_NAME, __name__, url_prefix="/api")
+_routes: list[str] = []
+_QUART_ATTRIBUTES = [
+    quart_schema.validation.QUART_SCHEMA_HEADERS_ATTRIBUTE,
+    quart_schema.validation.QUART_SCHEMA_RESPONSE_ATTRIBUTE,
+    quart_schema.openapi.QUART_SCHEMA_SECURITY_ATTRIBUTE,
+    quart_schema.openapi.QUART_SCHEMA_TAG_ATTRIBUTE,
+    quart_schema.openapi.QUART_SCHEMA_HIDDEN_ATTRIBUTE,
+    quart_schema.openapi.QUART_SCHEMA_DEPRECATED_ATTRIBUTE,
+    quart_schema.openapi.QUART_SCHEMA_OPERATION_ID_ATTRIBUTE,
+]
 
 
 def register(app: base.QuartApp) -> tuple[ModuleType, list[str]]:
     import atr.api as api
 
     app.register_blueprint(_BLUEPRINT)
-    return api, []
+    return api, _routes
+
+
+def typed(func: Callable[..., Any]) -> Callable[..., Any]:
+    """Decorator that derives the URL path from the function's type 
annotations.
+
+    - Literal["..."] parameters become literal path segments
+    - safe.ProjectName / safe.VersionName parameters are validated via cache/DB
+    - pydantic.BaseModel subclass parameters are parsed from the JSON request 
body
+    - dataclass parameters are parsed from the query string
+    - str | None parameters create optional URL segments (two routes 
registered)
+    - int, float, str use Quart's built-in type converters
+    - HTTP method is POST if a body param is present, GET otherwise
+    """
+    original = inspect.unwrap(func)
+    path, validated_params, literal_params, body_param, query_param, 
optional_params = common.build_api_path(original)
+    method = "POST" if body_param is not None else "GET"
+
+    async def wrapper(*_args: Any, **kwargs: Any) -> Any:
+        await common.run_validators(kwargs, validated_params)
+        kwargs.update(literal_params)
+
+        if body_param is not None:
+            body_name, body_cls = body_param
+            json_data = await quart.request.get_json()
+            try:
+                kwargs[body_name] = body_cls.model_validate(json_data)
+            except pydantic.ValidationError as e:
+                raise quart_schema.RequestSchemaValidationError(e) from e
+
+        if query_param is not None:
+            query_name, query_cls = query_param
+            kwargs[query_name] = _parse_query_args(query_cls, 
quart.request.args)
+
+        start_time_ns = time.perf_counter_ns()
+        response = await func(**kwargs)
+        end_time_ns = time.perf_counter_ns()
+        total_ms = (end_time_ns - start_time_ns) // 1_000_000
+        log.performance(f"API {method} {path} {original.__name__} = 0 0 
{total_ms}")
+
+        return response
+
+    endpoint = original.__module__.replace(".", "_") + "_" + original.__name__
+    wrapper.__name__ = original.__name__
+    wrapper.__doc__ = original.__doc__
+
+    # Replace the original quart request decorators
+    if query_param is not None:
+        wrapper = quart_schema.validate_querystring(query_param[1])(wrapper)
+    if body_param is not None:
+        wrapper = quart_schema.validate_request(body_param[1])(wrapper)
+
+    # Examine `func` for quart attributes and re-attach to the wrapped function
+    # This makes sure the OpenAPI documentation is preserved
+    # Note: we don't update querystring or request as they're processed above 
using our detected types
+    for attr in _QUART_ATTRIBUTES:
+        if hasattr(func, attr):
+            setattr(wrapper, attr, getattr(func, attr))
+
+    # If there are optional params, we need two routes, one with the optional 
params omitted
+    # and one with them all present.
+    # AM 26/03/03: This actually only handles the case where there's some 
required and a single optional, but
+    # that's the only case that existed in the original code. Theoretically we 
could count the optional params and
+    # generate the correct number of routes, but that's lot of effort for 
little gain right now
+    if optional_params:
+        required_segments = [
+            seg for seg in path.strip("/").split("/") if not any(seg == 
f"<{name}>" for name in optional_params)
+        ]
+        short_path = "/" + "/".join(required_segments)
+        defaults = {name: None for name in optional_params}
+        _BLUEPRINT.add_url_rule(short_path, endpoint=endpoint, 
view_func=wrapper, methods=[method], defaults=defaults)
+        _BLUEPRINT.add_url_rule(path, endpoint=endpoint + "_full", 
view_func=wrapper, methods=[method])
+    else:
+        _BLUEPRINT.add_url_rule(path, endpoint=endpoint, view_func=wrapper, 
methods=[method])
+
+    common.register_route(original, "api", _routes)
+    return wrapper
 
 
 @_BLUEPRINT.before_request
@@ -81,6 +173,35 @@ async def _handle_request_validation(err: 
quart_schema.RequestSchemaValidationEr
     return _json_error("Input validation failed", 400, {"validation_details": 
verr.errors()})
 
 
+def _coerce_query_field(raw: str, field_type: Any, field_name: str) -> Any:
+    """Coerce a raw query string value to the expected field type."""
+    if field_type is str or field_type == "str":
+        return raw
+    if field_type is int or field_type == "int":
+        try:
+            return int(raw)
+        except ValueError:
+            raise exceptions.BadRequest(f"Query parameter {field_name!r} must 
be an integer")
+    if field_type is bool or field_type == "bool":
+        return raw.lower() in ("true", "1", "yes")
+    return raw
+
+
+def _parse_query_args(query_cls: type, args: Any) -> Any:
+    """Parse query string parameters into a dataclass instance."""
+    field_values: dict[str, Any] = {}
+    for field in dataclasses.fields(query_cls):
+        raw = args.get(field.name)
+        if raw is None:
+            if field.default is not dataclasses.MISSING:
+                field_values[field.name] = field.default
+            elif field.default_factory is not dataclasses.MISSING:
+                field_values[field.name] = field.default_factory()
+            continue
+        field_values[field.name] = _coerce_query_field(raw, field.type, 
field.name)
+    return query_cls(**field_values)
+
+
 def _json_error(
     message: str, status_code: int | None, extra: dict[str, Any] | None = None
 ) -> tuple[quart.Response, int]:
diff --git a/atr/blueprints/common.py b/atr/blueprints/common.py
index 572aefe7..f9742d7a 100644
--- a/atr/blueprints/common.py
+++ b/atr/blueprints/common.py
@@ -15,12 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import dataclasses
 import inspect
+import types
+import typing
 from collections.abc import Callable
 from typing import Annotated, Any, Literal, TypeAliasType, get_args, 
get_origin, get_type_hints
 
 import asfquart.base as base
 import asfquart.session
+import pydantic
 
 import atr.cache as cache
 import atr.db as db
@@ -66,8 +70,11 @@ def build_path(
 ) -> tuple[str, list[tuple[str, type]], dict[str, str], tuple[str, type] | 
None, bool]:
     """Inspect a function's type hints to build a URL path and a validation 
plan.
 
+    Accepts URL path params for data, Literal strings for plain URL text, and 
Form params for POST bodies
+    Validates that the session param (web.Committer or web.Public) is first, 
and that only one Form param is allowed
+
     Returns (path, validated_params, literal_params, form_param, public) where:
-    - validated_params: (name, type) pairs for URL params validated via 
cache/DB
+    - validated_params: (name, type) pairs for URL params to be validated with 
cache/DB
     - literal_params: param name → literal string value for Literal["..."] 
params
     - form_param: (name, type) for the single form.Form subclass param, or None
     - public: True if the session type is web.Public
@@ -108,6 +115,76 @@ def build_path(
     return path, validated_params, literal_params, form_param, public
 
 
+def build_api_path(
+    func: Callable[..., Any],
+) -> tuple[
+    str,
+    list[tuple[str, type]],
+    dict[str, str],
+    tuple[str, type[pydantic.BaseModel]] | None,
+    tuple[str, type] | None,
+    list[str],
+]:
+    """Inspect a function's type hints to build a URL path for an API route.
+
+    Accepts URL path params for data, Literal strings for plain URL text, 
dataclasses for GET query params
+    and Pydantic model params for POST bodies
+
+    Returns (path, validated_params, literal_params, body_param, query_param,
+    optional_params) where:
+    - validated_params: (name, type) pairs for URL params to be validated with 
cache/DB
+    - literal_params: param name -> literal string value for Literal["..."] 
params
+    - body_param: (name, type) for the single BaseModel param, or None
+    - query_param: (name, type) for the single dataclass param, or None
+    - optional_params: param names whose type is T | None with a default of 
None
+    - return_type: the return type of the function
+    """
+    hints = get_type_hints(func, include_extras=True)
+    sig = inspect.signature(func)
+    params = list(sig.parameters.keys())
+    segments: list[str] = []
+    validated_params: list[tuple[str, type]] = []
+    literal_params: dict[str, str] = {}
+    body_param: tuple[str, type[pydantic.BaseModel]] | None = None
+    query_param: tuple[str, type] | None = None
+    optional_params: list[str] = []
+
+    for param_name in params:
+        hint = hints.get(param_name)
+        if hint is None:
+            raise TypeError(f"Parameter {param_name!r} in {func.__name__} has 
no type annotation")
+
+        if _is_body_type(hint):
+            if body_param is not None:
+                raise TypeError(f"Parameter {param_name!r} in {func.__name__}: 
only one body type is allowed")
+            body_param = (param_name, hint)
+            continue
+
+        if _is_query_type(hint):
+            if query_param is not None:
+                raise TypeError(f"Parameter {param_name!r} in {func.__name__}: 
only one query type is allowed")
+            query_param = (param_name, hint)
+            continue
+
+        inner, is_optional = _unwrap_optional(hint)
+        if is_optional:
+            segment = _param_to_segment(param_name, inner, func.__name__)
+            segments.append(segment)
+            optional_params.append(param_name)
+            # Note - this means that safe types which are optional will not 
get validated - no current use case for this
+            continue
+
+        segment = _param_to_segment(param_name, hint, func.__name__)
+        segments.append(segment)
+        if hint in VALIDATED_TYPES:
+            validated_params.append((param_name, hint))
+        elif get_origin(hint) is Literal:
+            literal_params[param_name] = str(get_args(hint)[0])
+
+    path = "/" + "/".join(segments)
+    return path, validated_params, literal_params, body_param, query_param, 
optional_params
+
+
 def register_route(func: Callable[..., Any], prefix: str, routes: list[str]) 
-> None:
     module_name = func.__module__.split(".")[-1]
     routes.append(f"{prefix}.{module_name}.{func.__name__}")
@@ -151,6 +228,15 @@ async def validate_version(project_name: safe.ProjectName, 
raw: str) -> safe.Ver
     return safe.VersionName(release.version)
 
 
+def _is_body_type(hint: Any) -> bool:
+    """Check if a type hint is a pydantic BaseModel subclass (but not a 
Form)."""
+    if not isinstance(hint, type):
+        return False
+    if issubclass(hint, form.Form):
+        return False
+    return issubclass(hint, pydantic.BaseModel)
+
+
 def _is_form_type(hint: Any) -> bool:
     """Check if a type hint represents a form.Form subclass or Annotated 
discriminated union of forms."""
     if isinstance(hint, type) and issubclass(hint, form.Form):
@@ -164,6 +250,11 @@ def _is_form_type(hint: Any) -> bool:
     return False
 
 
+def _is_query_type(hint: Any) -> bool:
+    """Check if a type hint is a dataclass (used for query-string params)."""
+    return dataclasses.is_dataclass(hint) and isinstance(hint, type)
+
+
 def _param_to_segment(param_name: str, hint: Any, func_name: str) -> str:
     """Convert a single parameter's type hint into a URL path segment."""
     if get_origin(hint) is Literal:
@@ -175,3 +266,19 @@ def _param_to_segment(param_name: str, hint: Any, 
func_name: str) -> str:
     if hint is str:
         return f"<{param_name}>"
     raise TypeError(f"Parameter {param_name!r} in {func_name} has unsupported 
type {hint!r}")
+
+
+def _unwrap_optional(hint: Any) -> tuple[Any, bool]:
+    """If hint is T | None, return (T, True). Otherwise return (hint, False).
+
+    Handles both ``str | None`` (types.UnionType) and ``typing.Optional[str]``
+    (typing.Union).
+    """
+    origin = get_origin(hint)
+    if origin is not types.UnionType and origin is not typing.Union:
+        return hint, False
+    args = get_args(hint)
+    non_none = [a for a in args if a is not type(None)]
+    if len(non_none) == 1 and type(None) in args:
+        return non_none[0], True
+    return hint, False


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to