This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new a8af58e  Make the token forms more type safe
a8af58e is described below

commit a8af58ec733ac0598022eac2090ee10417e54f22
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Nov 10 19:24:43 2025 +0000

    Make the token forms more type safe
---
 atr/get/tokens.py             | 110 +++++++++++++++-
 atr/htm.py                    |   5 +
 atr/post/tokens.py            |  56 +++++++-
 atr/shared/tokens.py          | 297 +++---------------------------------------
 atr/storage/writers/tokens.py |  15 +++
 atr/web.py                    |  17 +++
 6 files changed, 219 insertions(+), 281 deletions(-)

diff --git a/atr/get/tokens.py b/atr/get/tokens.py
index 21a8d18..7779e7c 100644
--- a/atr/get/tokens.py
+++ b/atr/get/tokens.py
@@ -15,12 +15,116 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 import atr.blueprints.get as get
+import atr.form as form
+import atr.htm as htm
+import atr.models.sql as sql
+import atr.post as post
 import atr.shared as shared
+import atr.storage as storage
+import atr.template as template
+import atr.util as util
 import atr.web as web
 
 
 @get.committer("/tokens")
-async def tokens(session: web.Committer) -> str | web.WerkzeugResponse:
-    return await shared.tokens.tokens(session)
+async def tokens(session: web.Committer) -> str:
+    async with storage.read_as_foundation_committer() as rafc:
+        tokens_list = await rafc.tokens.own_personal_access_tokens()
+        most_recent_pat = await rafc.tokens.most_recent_jwt_pat()
+
+    page = htm.Block()
+    page.h1["Tokens"]
+    page.h2["Personal Access Tokens (PATs)"]
+    page.p[
+        """Generate tokens for API access. For security, the plaintext
+        token is shown only once when you create it. You can revoke tokens
+        you no longer need."""
+    ]
+    add_form = form.render(
+        model_cls=shared.tokens.AddTokenForm,
+        form_classes=".mb-0",
+        submit_label="Generate token",
+    )
+    page.div(".card.mb-4")[
+        htm.div(".card-header")["Generate new token"],
+        htm.div(".card-body")[add_form],
+    ]
+    _build_tokens_table(page, tokens_list)
+
+    page.h2["JSON Web Token (JWT)"]
+    jwt_section = htm.Block()
+    jwt_section.p[
+        """Generate a JSON Web Token (JWT) to authenticate calls to ATR's
+        private API routes. Treat the token like a password and include it
+        in the Authorization header as a Bearer token when invoking the
+        protected endpoints."""
+    ]
+    form.render_block(
+        jwt_section,
+        model_cls=form.Empty,
+        action=util.as_url(post.tokens.jwt_post),
+        form_classes="#issue-jwt-form",
+        submit_label="Generate JWT",
+        use_error_data=False,
+    )
+    jwt_section.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap 
border rounded w-50")
+    if most_recent_pat and most_recent_pat.last_used:
+        jwt_section.p(".mt-3")[
+            "You most recently used a PAT to issue a JWT at ",
+            htm.strong[util.format_datetime(most_recent_pat.last_used) + "Z"],
+            ", using the PAT labelled ",
+            htm.code[most_recent_pat.label or "[Untitled]"],
+            ".",
+        ]
+    page.append(jwt_section)
+
+    return await template.render_sync(
+        "blank.html",
+        title="Tokens",
+        description="Manage your PATs and JWTs.",
+        content=page.collect(),
+        javascripts=[util.static_path("js", "create-a-jwt.js")],
+    )
+
+
+def _build_tokens_table(page: htm.Block, tokens_list: 
list[sql.PersonalAccessToken]) -> None:
+    if not tokens_list:
+        page.p["No tokens found."]
+        return
+
+    tbody = htm.Block(htm.tbody)
+    for t in tokens_list:
+        if not t.id:
+            continue
+
+        delete_form = form.render(
+            model_cls=shared.tokens.DeleteTokenForm,
+            action=util.as_url(post.tokens.tokens),
+            form_classes=".mb-0",
+            submit_classes="btn-sm btn-danger",
+            submit_label="Delete",
+            defaults={"token_id": t.id},
+            use_error_data=False,
+            empty=True,
+        )
+        tbody.tr(".align-middle")[
+            htm.td[t.label or ""],
+            htm.td[util.format_datetime(t.created)],
+            htm.td[util.format_datetime(t.expires)],
+            htm.td[util.format_datetime(t.last_used) if t.last_used else 
"Never"],
+            htm.td[delete_form],
+        ]
+
+    page.table(".table.table-striped")[
+        htm.thead[
+            htm.tr[
+                htm.th["Label"],
+                htm.th["Created"],
+                htm.th["Expires"],
+                htm.th["Last used"],
+                htm.th[""],
+            ]
+        ],
+        tbody.collect(),
+    ]
diff --git a/atr/htm.py b/atr/htm.py
index 530800d..6df198a 100644
--- a/atr/htm.py
+++ b/atr/htm.py
@@ -249,6 +249,11 @@ class Block:
         self.__check_parent("title", {"head", "html"})
         return BlockElementCallable(self, title)
 
+    @property
+    def tr(self) -> BlockElementCallable:
+        self.__check_parent("tr", {"tbody", "table"})
+        return BlockElementCallable(self, tr)
+
     @property
     def ul(self) -> BlockElementCallable:
         self.__check_parent("ul", {"body", "div"})
diff --git a/atr/post/tokens.py b/atr/post/tokens.py
index cae3e60..4d30d70 100644
--- a/atr/post/tokens.py
+++ b/atr/post/tokens.py
@@ -15,13 +15,24 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
+import hashlib
+import secrets
+from typing import Final
+
+import quart
 
 import atr.blueprints.post as post
+import atr.get as get
+import atr.htm as htm
 import atr.jwtoken as jwtoken
 import atr.shared as shared
+import atr.storage as storage
 import atr.util as util
 import atr.web as web
 
+_EXPIRY_DAYS: Final[int] = 180
+
 
 @post.committer("/tokens/jwt")
 async def jwt_post(session: web.Committer) -> web.QuartResponse:
@@ -32,5 +43,46 @@ async def jwt_post(session: web.Committer) -> 
web.QuartResponse:
 
 
 @post.committer("/tokens")
-async def tokens(session: web.Committer) -> str | web.WerkzeugResponse:
-    return await shared.tokens.tokens(session)
[email protected](shared.tokens.TokenForm)
+async def tokens(session: web.Committer, token_form: shared.tokens.TokenForm) 
-> web.WerkzeugResponse:
+    match token_form:
+        case shared.tokens.AddTokenForm() as add_form:
+            return await _add_token(session, add_form)
+
+        case shared.tokens.DeleteTokenForm() as delete_form:
+            return await _delete_token(session, delete_form)
+
+
+async def _add_token(session: web.Committer, add_form: 
shared.tokens.AddTokenForm) -> web.WerkzeugResponse:
+    plaintext = secrets.token_urlsafe(32)
+    token_hash = hashlib.sha3_256(plaintext.encode()).hexdigest()
+    created = datetime.datetime.now(datetime.UTC)
+    expires = created + datetime.timedelta(days=_EXPIRY_DAYS)
+
+    async with storage.write() as write:
+        wafc = write.as_foundation_committer()
+        await wafc.tokens.add_token(
+            session.uid,
+            token_hash,
+            created,
+            expires,
+            add_form.label or None,
+        )
+
+    await web.flash_success(
+        htm.p[
+            htm.strong["Your new token"],
+            " is ",
+            htm.code(".bg-light.border.rounded.px-1")[plaintext],
+        ],
+        htm.p(".mb-0")["Copy it now as you will not be able to see it again."],
+    )
+    return await session.redirect(get.tokens.tokens)
+
+
+async def _delete_token(session: web.Committer, delete_form: 
shared.tokens.DeleteTokenForm) -> web.WerkzeugResponse:
+    async with storage.write(session) as write:
+        wafc = write.as_foundation_committer()
+        await wafc.tokens.delete_token(delete_form.token_id)
+    await quart.flash("Token deleted successfully", "success")
+    return await session.redirect(get.tokens.tokens)
diff --git a/atr/shared/tokens.py b/atr/shared/tokens.py
index 28f8764..3bb9c9c 100644
--- a/atr/shared/tokens.py
+++ b/atr/shared/tokens.py
@@ -15,289 +15,34 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from typing import Annotated, Literal
 
-import datetime
-import hashlib
-import secrets
-import time
-from typing import Final
+import pydantic
 
-import markupsafe
-import quart
-import sqlmodel
-import werkzeug.datastructures as datastructures
-import wtforms.fields.core as core
+import atr.form as form
 
-import atr.db as db
-import atr.forms as forms
-import atr.htm as htm
-import atr.jwtoken as jwtoken
-import atr.log as log
-import atr.models.sql as sql
-import atr.storage as storage
-import atr.template as templates
-import atr.util as util
-import atr.web as web
+type ADD_TOKEN = Literal["add_token"]
+type DELETE_TOKEN = Literal["delete_token"]
 
-_EXPIRY_DAYS: Final[int] = 180
 
+class AddTokenForm(form.Form):
+    variant: ADD_TOKEN = form.value(ADD_TOKEN)
+    label: str = form.label("Label")
 
-type Fragment = htm.Element | core.Field | str
+    @pydantic.field_validator("label", mode="after")
+    @classmethod
+    def validate_label_length(cls, value: str) -> str:
+        if len(value) > 100:
+            raise ValueError("Label must be 100 characters or less")
+        return value
 
 
-class AddTokenForm(forms.Typed):
-    label = forms.string("Label", optional=True, 
validators=forms.length(max=100), placeholder="E.g. CI bot")
-    submit = forms.submit("Generate token")
+class DeleteTokenForm(form.Form):
+    variant: DELETE_TOKEN = form.value(DELETE_TOKEN)
+    token_id: form.Int = form.label("Token ID", widget=form.Widget.HIDDEN)
 
 
-class DeleteTokenForm(forms.Typed):
-    token_id = forms.hidden()
-    submit = forms.submit("Delete")
-
-
-class IssueJWTForm(forms.Typed):
-    submit = forms.submit("Generate JWT")
-
-
-async def tokens(session: web.Committer) -> str | web.WerkzeugResponse:
-    request_form = await quart.request.form
-
-    if is_post := quart.request.method == "POST":
-        maybe_response = await _handle_post(session, request_form)
-        if maybe_response is not None:
-            return maybe_response
-
-    add_form = await AddTokenForm.create_form(data=request_form if is_post 
else None)
-    issue_form = await IssueJWTForm.create_form(data=request_form if is_post 
else None)
-
-    start = time.perf_counter_ns()
-    async with storage.read_as_foundation_committer() as rafc:
-        tokens_list = await rafc.tokens.own_personal_access_tokens()
-        most_recent_pat = await rafc.tokens.most_recent_jwt_pat()
-    end = time.perf_counter_ns()
-    log.info("Tokens list fetched in %dms", (end - start) / 1_000_000)
-
-    start = time.perf_counter_ns()
-    add_form_elem = _build_add_form_element(add_form)
-    issue_form_elem = _build_issue_jwt_form_element(issue_form)
-    tokens_table = _build_tokens_table(tokens_list)
-
-    issue_jwt = [
-        htm.p[
-            """Generate a JSON Web Token (JWT) to authenticate calls to ATR's
-            private API routes. Treat the token like a password and include it
-            in the Authorization header as a Bearer token when invoking the
-            protected endpoints."""
-            # p["Example"],
-        ],
-        issue_form_elem,
-        htm.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap border 
rounded w-50"),
-    ]
-
-    if most_recent_pat and most_recent_pat.last_used:
-        issue_jwt.append(
-            htm.p(".mt-3")[
-                "You most recently used a PAT to issue a JWT at ",
-                htm.strong[util.format_datetime(most_recent_pat.last_used) + 
"Z"],
-                ", using the PAT labelled ",
-                htm.code[most_recent_pat.label or "[Untitled]"],
-                ".",
-            ]
-        )
-
-    content_elem = htm.div[
-        htm.h1["Tokens"],
-        htm.h2["Personal Access Tokens (PATs)"],
-        htm.p[
-            """Generate tokens for API access. For security, the plaintext
-            token is shown only once when you create it. You can revoke tokens
-            you no longer need."""
-        ],
-        htm.div(".card.mb-4")[
-            htm.div(".card-header")["Generate new token"],
-            htm.div(".card-body")[add_form_elem],
-        ],
-        tokens_table,
-        htm.h2["JSON Web Token (JWT)"],
-        htm.div[issue_jwt],
-    ]
-    end = time.perf_counter_ns()
-    log.info("Content elem built in %dms", (end - start) / 1_000_000)
-
-    start = time.perf_counter_ns()
-    rendered = await templates.render(
-        "blank.html",
-        title="Tokens",
-        description="Manage your PATs and JWTs.",
-        content=content_elem,
-        javascripts=[util.static_path("js", "create-a-jwt.js")],
-    )
-    end = time.perf_counter_ns()
-    log.info("Rendered in %dms", (end - start) / 1_000_000)
-
-    return rendered
-
-
-def _as_markup(fragment: Fragment) -> markupsafe.Markup:
-    return markupsafe.Markup(str(fragment))
-
-
-def _build_add_form_element(a_form: AddTokenForm) -> markupsafe.Markup:
-    import atr.post as post
-
-    elem = htm.form(method="post", action=util.as_url(post.tokens.tokens))[
-        _as_markup(a_form.csrf_token),
-        htm.div(".mb-3")[
-            a_form.label.label,
-            a_form.label(class_="form-control"),
-        ],
-        a_form.submit(class_="btn btn-primary"),
-    ]
-    return _as_markup(elem)
-
-
-def _build_delete_form_element(token_id: int | None) -> markupsafe.Markup:
-    import atr.post as post
-
-    d_form = DeleteTokenForm()
-    d_form.token_id.data = "" if token_id is None else str(token_id)
-    elem = htm.form(".mb-0", method="post", 
action=util.as_url(post.tokens.tokens))[
-        _as_markup(d_form.csrf_token),
-        _as_markup(d_form.token_id),
-        d_form.submit(class_="btn btn-sm btn-danger"),
-    ]
-    return _as_markup(elem)
-
-
-def _build_issue_jwt_form_element(j_form: IssueJWTForm) -> markupsafe.Markup:
-    import atr.post as post
-
-    elem = htm.form("#issue-jwt-form", method="post", 
action=util.as_url(post.tokens.jwt_post))[
-        _as_markup(j_form.csrf_token),
-        j_form.submit(class_="btn btn-primary"),
-    ]
-    return _as_markup(elem)
-
-
-def _build_tokens_table(tokens_list: list[sql.PersonalAccessToken]) -> 
markupsafe.Markup:
-    if not tokens_list:
-        return _as_markup(htm.p["No tokens found."])
-
-    rows = [
-        htm.tr(".align-middle")[
-            htm.td[t.label or ""],
-            htm.td[util.format_datetime(t.created)],
-            htm.td[util.format_datetime(t.expires)],
-            htm.td[util.format_datetime(t.last_used) if t.last_used else 
"Never"],
-            htm.td[_build_delete_form_element(t.id)],
-        ]
-        for t in tokens_list
-    ]
-
-    table_elem = htm.table(".table.table-striped")[
-        htm.thead[
-            htm.tr[
-                htm.th["Label"],
-                htm.th["Created"],
-                htm.th["Expires"],
-                htm.th["Last used"],
-                htm.th[""],
-            ]
-        ],
-        htm.tbody[rows],
-    ]
-    return _as_markup(table_elem)
-
-
-async def _create_token(uid: str, label: str | None) -> str:
-    plaintext = secrets.token_urlsafe(32)
-    token_hash = hashlib.sha3_256(plaintext.encode()).hexdigest()
-    created = datetime.datetime.now(datetime.UTC)
-    expires = created + datetime.timedelta(days=_EXPIRY_DAYS)
-
-    async with storage.write() as write:
-        wafc = write.as_foundation_committer()
-        await wafc.tokens.add_token(uid, token_hash, created, expires, label)
-    return plaintext
-
-
[email protected]_commit_function
-async def _delete_token(data: db.Session, uid: str, token_id: int) -> None:
-    pat = await data.query_one_or_none(
-        sqlmodel.select(sql.PersonalAccessToken).where(
-            sql.PersonalAccessToken.id == token_id,
-            sql.PersonalAccessToken.asfuid == uid,
-        )
-    )
-    if pat:
-        await data.delete(pat)
-
-
-async def _handle_post(session: web.Committer, request_form: 
datastructures.MultiDict) -> web.WerkzeugResponse | None:
-    if "token_id" in request_form:
-        return await _handle_delete_token_post(session, request_form)
-
-    if "label" in request_form:
-        return await _handle_add_token_post(session, request_form)
-
-    return await _handle_issue_jwt_post(session, request_form)
-
-
-async def _handle_add_token_post(
-    session: web.Committer, request_form: datastructures.MultiDict
-) -> web.WerkzeugResponse | None:
-    import atr.get as get
-
-    add_form = await AddTokenForm.create_form(data=request_form)
-    if await add_form.validate_on_submit():
-        label_val = str(add_form.label.data) if add_form.label.data else None
-        plaintext = await _create_token(session.uid, label_val)
-        success_msg = htm.div[
-            htm.p[
-                htm.strong["Your new token"],
-                " is ",
-                htm.code(".bg-light.border.rounded.px-1")[plaintext],
-            ],
-            htm.p(".mb-0")["Copy it now as you will not be able to see it 
again."],
-        ]
-        await quart.flash(_as_markup(success_msg), "success")
-        return await session.redirect(get.tokens.tokens)
-
-    return None
-
-
-async def _handle_delete_token_post(
-    session: web.Committer, request_form: datastructures.MultiDict
-) -> web.WerkzeugResponse | None:
-    import atr.get as get
-
-    del_form = await DeleteTokenForm.create_form(data=request_form)
-    if await del_form.validate_on_submit():
-        token_id_val = int(str(del_form.token_id.data))
-        await _delete_token(session.uid, token_id_val)
-        await quart.flash("Token deleted successfully", "success")
-        return await session.redirect(get.tokens.tokens)
-
-    await quart.flash("Invalid delete request", "error")
-    return None
-
-
-async def _handle_issue_jwt_post(
-    session: web.Committer, request_form: datastructures.MultiDict
-) -> web.WerkzeugResponse | None:
-    import atr.get as get
-
-    issue_form = await IssueJWTForm.create_form(data=request_form)
-    if await issue_form.validate_on_submit():
-        jwt_token = jwtoken.issue(session.uid)
-        success_msg = htm.div[
-            htm.p[
-                htm.strong["Your new JWT"],
-                " is:",
-            ],
-            
htm.p[htm.code(".bg-light.border.rounded.px-1.atr-word-wrap")[jwt_token],],
-        ]
-        await quart.flash(_as_markup(success_msg), "success")
-        return await session.redirect(get.tokens.tokens)
-
-    return None
+type TokenForm = Annotated[
+    AddTokenForm | DeleteTokenForm,
+    form.DISCRIMINATOR,
+]
diff --git a/atr/storage/writers/tokens.py b/atr/storage/writers/tokens.py
index 6250743..e3ca4cf 100644
--- a/atr/storage/writers/tokens.py
+++ b/atr/storage/writers/tokens.py
@@ -67,6 +67,21 @@ class FoundationCommitter(GeneralPublic):
         await self.__data.commit()
         return pat
 
+    async def delete_token(self, token_id: int) -> None:
+        pat = await self.__data.query_one_or_none(
+            sqlmodel.select(sql.PersonalAccessToken).where(
+                sql.PersonalAccessToken.id == token_id,
+                sql.PersonalAccessToken.asfuid == self.__asf_uid,
+            )
+        )
+        if pat is not None:
+            await self.__data.delete(pat)
+            await self.__data.commit()
+            self.__write_as.append_to_audit_log(
+                asf_uid=self.__asf_uid,
+                token_id=token_id,
+            )
+
     async def issue_jwt(self, pat_text: str) -> str:
         pat_hash = hashlib.sha3_256(pat_text.encode()).hexdigest()
         pat = await self.__data.query_one_or_none(
diff --git a/atr/web.py b/atr/web.py
index da2c0b5..33b7cff 100644
--- a/atr/web.py
+++ b/atr/web.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Protocol, TypeVar
 
 import asfquart.base as base
 import asfquart.session as session
+import markupsafe
 import pydantic_core
 import quart
 import werkzeug.datastructures.headers
@@ -278,6 +279,22 @@ class ZipResponse(quart.Response):
         super().__init__(response, status=status, headers=raw_headers, 
mimetype="application/zip")
 
 
+async def flash_error(*messages: htm.Element) -> None:
+    div = htm.Block(htm.div, classes=".atr-initial")
+    for message in messages:
+        div.append(message)
+
+    await quart.flash(markupsafe.Markup(str(div.collect())), category="error")
+
+
+async def flash_success(*messages: htm.Element) -> None:
+    div = htm.Block(htm.div, classes=".atr-initial")
+    for message in messages:
+        div.append(message)
+
+    await quart.flash(markupsafe.Markup(str(div.collect())), 
category="success")
+
+
 async def form_error(error: str) -> None:
     pass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to