This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new a8af58e Make the token forms more type safe
a8af58e is described below
commit a8af58ec733ac0598022eac2090ee10417e54f22
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Nov 10 19:24:43 2025 +0000
Make the token forms more type safe
---
atr/get/tokens.py | 110 +++++++++++++++-
atr/htm.py | 5 +
atr/post/tokens.py | 56 +++++++-
atr/shared/tokens.py | 297 +++---------------------------------------
atr/storage/writers/tokens.py | 15 +++
atr/web.py | 17 +++
6 files changed, 219 insertions(+), 281 deletions(-)
diff --git a/atr/get/tokens.py b/atr/get/tokens.py
index 21a8d18..7779e7c 100644
--- a/atr/get/tokens.py
+++ b/atr/get/tokens.py
@@ -15,12 +15,116 @@
# specific language governing permissions and limitations
# under the License.
-
import atr.blueprints.get as get
+import atr.form as form
+import atr.htm as htm
+import atr.models.sql as sql
+import atr.post as post
import atr.shared as shared
+import atr.storage as storage
+import atr.template as template
+import atr.util as util
import atr.web as web
@get.committer("/tokens")
-async def tokens(session: web.Committer) -> str | web.WerkzeugResponse:
- return await shared.tokens.tokens(session)
+async def tokens(session: web.Committer) -> str:
+ async with storage.read_as_foundation_committer() as rafc:
+ tokens_list = await rafc.tokens.own_personal_access_tokens()
+ most_recent_pat = await rafc.tokens.most_recent_jwt_pat()
+
+ page = htm.Block()
+ page.h1["Tokens"]
+ page.h2["Personal Access Tokens (PATs)"]
+ page.p[
+ """Generate tokens for API access. For security, the plaintext
+ token is shown only once when you create it. You can revoke tokens
+ you no longer need."""
+ ]
+ add_form = form.render(
+ model_cls=shared.tokens.AddTokenForm,
+ form_classes=".mb-0",
+ submit_label="Generate token",
+ )
+ page.div(".card.mb-4")[
+ htm.div(".card-header")["Generate new token"],
+ htm.div(".card-body")[add_form],
+ ]
+ _build_tokens_table(page, tokens_list)
+
+ page.h2["JSON Web Token (JWT)"]
+ jwt_section = htm.Block()
+ jwt_section.p[
+ """Generate a JSON Web Token (JWT) to authenticate calls to ATR's
+ private API routes. Treat the token like a password and include it
+ in the Authorization header as a Bearer token when invoking the
+ protected endpoints."""
+ ]
+ form.render_block(
+ jwt_section,
+ model_cls=form.Empty,
+ action=util.as_url(post.tokens.jwt_post),
+ form_classes="#issue-jwt-form",
+ submit_label="Generate JWT",
+ use_error_data=False,
+ )
+ jwt_section.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap
border rounded w-50")
+ if most_recent_pat and most_recent_pat.last_used:
+ jwt_section.p(".mt-3")[
+ "You most recently used a PAT to issue a JWT at ",
+ htm.strong[util.format_datetime(most_recent_pat.last_used) + "Z"],
+ ", using the PAT labelled ",
+ htm.code[most_recent_pat.label or "[Untitled]"],
+ ".",
+ ]
+ page.append(jwt_section)
+
+ return await template.render_sync(
+ "blank.html",
+ title="Tokens",
+ description="Manage your PATs and JWTs.",
+ content=page.collect(),
+ javascripts=[util.static_path("js", "create-a-jwt.js")],
+ )
+
+
+def _build_tokens_table(page: htm.Block, tokens_list:
list[sql.PersonalAccessToken]) -> None:
+ if not tokens_list:
+ page.p["No tokens found."]
+ return
+
+ tbody = htm.Block(htm.tbody)
+ for t in tokens_list:
+ if not t.id:
+ continue
+
+ delete_form = form.render(
+ model_cls=shared.tokens.DeleteTokenForm,
+ action=util.as_url(post.tokens.tokens),
+ form_classes=".mb-0",
+ submit_classes="btn-sm btn-danger",
+ submit_label="Delete",
+ defaults={"token_id": t.id},
+ use_error_data=False,
+ empty=True,
+ )
+ tbody.tr(".align-middle")[
+ htm.td[t.label or ""],
+ htm.td[util.format_datetime(t.created)],
+ htm.td[util.format_datetime(t.expires)],
+ htm.td[util.format_datetime(t.last_used) if t.last_used else
"Never"],
+ htm.td[delete_form],
+ ]
+
+ page.table(".table.table-striped")[
+ htm.thead[
+ htm.tr[
+ htm.th["Label"],
+ htm.th["Created"],
+ htm.th["Expires"],
+ htm.th["Last used"],
+ htm.th[""],
+ ]
+ ],
+ tbody.collect(),
+ ]
diff --git a/atr/htm.py b/atr/htm.py
index 530800d..6df198a 100644
--- a/atr/htm.py
+++ b/atr/htm.py
@@ -249,6 +249,11 @@ class Block:
self.__check_parent("title", {"head", "html"})
return BlockElementCallable(self, title)
+ @property
+ def tr(self) -> BlockElementCallable:
+ self.__check_parent("tr", {"tbody", "table"})
+ return BlockElementCallable(self, tr)
+
@property
def ul(self) -> BlockElementCallable:
self.__check_parent("ul", {"body", "div"})
diff --git a/atr/post/tokens.py b/atr/post/tokens.py
index cae3e60..4d30d70 100644
--- a/atr/post/tokens.py
+++ b/atr/post/tokens.py
@@ -15,13 +15,24 @@
# specific language governing permissions and limitations
# under the License.
+import datetime
+import hashlib
+import secrets
+from typing import Final
+
+import quart
import atr.blueprints.post as post
+import atr.get as get
+import atr.htm as htm
import atr.jwtoken as jwtoken
import atr.shared as shared
+import atr.storage as storage
import atr.util as util
import atr.web as web
+_EXPIRY_DAYS: Final[int] = 180
+
@post.committer("/tokens/jwt")
async def jwt_post(session: web.Committer) -> web.QuartResponse:
@@ -32,5 +43,46 @@ async def jwt_post(session: web.Committer) ->
web.QuartResponse:
@post.committer("/tokens")
-async def tokens(session: web.Committer) -> str | web.WerkzeugResponse:
- return await shared.tokens.tokens(session)
[email protected](shared.tokens.TokenForm)
+async def tokens(session: web.Committer, token_form: shared.tokens.TokenForm)
-> web.WerkzeugResponse:
+ match token_form:
+ case shared.tokens.AddTokenForm() as add_form:
+ return await _add_token(session, add_form)
+
+ case shared.tokens.DeleteTokenForm() as delete_form:
+ return await _delete_token(session, delete_form)
+
+
+async def _add_token(session: web.Committer, add_form:
shared.tokens.AddTokenForm) -> web.WerkzeugResponse:
+ plaintext = secrets.token_urlsafe(32)
+ token_hash = hashlib.sha3_256(plaintext.encode()).hexdigest()
+ created = datetime.datetime.now(datetime.UTC)
+ expires = created + datetime.timedelta(days=_EXPIRY_DAYS)
+
+ async with storage.write() as write:
+ wafc = write.as_foundation_committer()
+ await wafc.tokens.add_token(
+ session.uid,
+ token_hash,
+ created,
+ expires,
+ add_form.label or None,
+ )
+
+ await web.flash_success(
+ htm.p[
+ htm.strong["Your new token"],
+ " is ",
+ htm.code(".bg-light.border.rounded.px-1")[plaintext],
+ ],
+ htm.p(".mb-0")["Copy it now as you will not be able to see it again."],
+ )
+ return await session.redirect(get.tokens.tokens)
+
+
+async def _delete_token(session: web.Committer, delete_form:
shared.tokens.DeleteTokenForm) -> web.WerkzeugResponse:
+ async with storage.write(session) as write:
+ wafc = write.as_foundation_committer()
+ await wafc.tokens.delete_token(delete_form.token_id)
+ await quart.flash("Token deleted successfully", "success")
+ return await session.redirect(get.tokens.tokens)
diff --git a/atr/shared/tokens.py b/atr/shared/tokens.py
index 28f8764..3bb9c9c 100644
--- a/atr/shared/tokens.py
+++ b/atr/shared/tokens.py
@@ -15,289 +15,34 @@
# specific language governing permissions and limitations
# under the License.
+from typing import Annotated, Literal
-import datetime
-import hashlib
-import secrets
-import time
-from typing import Final
+import pydantic
-import markupsafe
-import quart
-import sqlmodel
-import werkzeug.datastructures as datastructures
-import wtforms.fields.core as core
+import atr.form as form
-import atr.db as db
-import atr.forms as forms
-import atr.htm as htm
-import atr.jwtoken as jwtoken
-import atr.log as log
-import atr.models.sql as sql
-import atr.storage as storage
-import atr.template as templates
-import atr.util as util
-import atr.web as web
+type ADD_TOKEN = Literal["add_token"]
+type DELETE_TOKEN = Literal["delete_token"]
-_EXPIRY_DAYS: Final[int] = 180
+class AddTokenForm(form.Form):
+ variant: ADD_TOKEN = form.value(ADD_TOKEN)
+ label: str = form.label("Label")
-type Fragment = htm.Element | core.Field | str
+ @pydantic.field_validator("label", mode="after")
+ @classmethod
+ def validate_label_length(cls, value: str) -> str:
+ if len(value) > 100:
+ raise ValueError("Label must be 100 characters or less")
+ return value
-class AddTokenForm(forms.Typed):
- label = forms.string("Label", optional=True,
validators=forms.length(max=100), placeholder="E.g. CI bot")
- submit = forms.submit("Generate token")
+class DeleteTokenForm(form.Form):
+ variant: DELETE_TOKEN = form.value(DELETE_TOKEN)
+ token_id: form.Int = form.label("Token ID", widget=form.Widget.HIDDEN)
-class DeleteTokenForm(forms.Typed):
- token_id = forms.hidden()
- submit = forms.submit("Delete")
-
-
-class IssueJWTForm(forms.Typed):
- submit = forms.submit("Generate JWT")
-
-
-async def tokens(session: web.Committer) -> str | web.WerkzeugResponse:
- request_form = await quart.request.form
-
- if is_post := quart.request.method == "POST":
- maybe_response = await _handle_post(session, request_form)
- if maybe_response is not None:
- return maybe_response
-
- add_form = await AddTokenForm.create_form(data=request_form if is_post
else None)
- issue_form = await IssueJWTForm.create_form(data=request_form if is_post
else None)
-
- start = time.perf_counter_ns()
- async with storage.read_as_foundation_committer() as rafc:
- tokens_list = await rafc.tokens.own_personal_access_tokens()
- most_recent_pat = await rafc.tokens.most_recent_jwt_pat()
- end = time.perf_counter_ns()
- log.info("Tokens list fetched in %dms", (end - start) / 1_000_000)
-
- start = time.perf_counter_ns()
- add_form_elem = _build_add_form_element(add_form)
- issue_form_elem = _build_issue_jwt_form_element(issue_form)
- tokens_table = _build_tokens_table(tokens_list)
-
- issue_jwt = [
- htm.p[
- """Generate a JSON Web Token (JWT) to authenticate calls to ATR's
- private API routes. Treat the token like a password and include it
- in the Authorization header as a Bearer token when invoking the
- protected endpoints."""
- # p["Example"],
- ],
- issue_form_elem,
- htm.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap border
rounded w-50"),
- ]
-
- if most_recent_pat and most_recent_pat.last_used:
- issue_jwt.append(
- htm.p(".mt-3")[
- "You most recently used a PAT to issue a JWT at ",
- htm.strong[util.format_datetime(most_recent_pat.last_used) +
"Z"],
- ", using the PAT labelled ",
- htm.code[most_recent_pat.label or "[Untitled]"],
- ".",
- ]
- )
-
- content_elem = htm.div[
- htm.h1["Tokens"],
- htm.h2["Personal Access Tokens (PATs)"],
- htm.p[
- """Generate tokens for API access. For security, the plaintext
- token is shown only once when you create it. You can revoke tokens
- you no longer need."""
- ],
- htm.div(".card.mb-4")[
- htm.div(".card-header")["Generate new token"],
- htm.div(".card-body")[add_form_elem],
- ],
- tokens_table,
- htm.h2["JSON Web Token (JWT)"],
- htm.div[issue_jwt],
- ]
- end = time.perf_counter_ns()
- log.info("Content elem built in %dms", (end - start) / 1_000_000)
-
- start = time.perf_counter_ns()
- rendered = await templates.render(
- "blank.html",
- title="Tokens",
- description="Manage your PATs and JWTs.",
- content=content_elem,
- javascripts=[util.static_path("js", "create-a-jwt.js")],
- )
- end = time.perf_counter_ns()
- log.info("Rendered in %dms", (end - start) / 1_000_000)
-
- return rendered
-
-
-def _as_markup(fragment: Fragment) -> markupsafe.Markup:
- return markupsafe.Markup(str(fragment))
-
-
-def _build_add_form_element(a_form: AddTokenForm) -> markupsafe.Markup:
- import atr.post as post
-
- elem = htm.form(method="post", action=util.as_url(post.tokens.tokens))[
- _as_markup(a_form.csrf_token),
- htm.div(".mb-3")[
- a_form.label.label,
- a_form.label(class_="form-control"),
- ],
- a_form.submit(class_="btn btn-primary"),
- ]
- return _as_markup(elem)
-
-
-def _build_delete_form_element(token_id: int | None) -> markupsafe.Markup:
- import atr.post as post
-
- d_form = DeleteTokenForm()
- d_form.token_id.data = "" if token_id is None else str(token_id)
- elem = htm.form(".mb-0", method="post",
action=util.as_url(post.tokens.tokens))[
- _as_markup(d_form.csrf_token),
- _as_markup(d_form.token_id),
- d_form.submit(class_="btn btn-sm btn-danger"),
- ]
- return _as_markup(elem)
-
-
-def _build_issue_jwt_form_element(j_form: IssueJWTForm) -> markupsafe.Markup:
- import atr.post as post
-
- elem = htm.form("#issue-jwt-form", method="post",
action=util.as_url(post.tokens.jwt_post))[
- _as_markup(j_form.csrf_token),
- j_form.submit(class_="btn btn-primary"),
- ]
- return _as_markup(elem)
-
-
-def _build_tokens_table(tokens_list: list[sql.PersonalAccessToken]) ->
markupsafe.Markup:
- if not tokens_list:
- return _as_markup(htm.p["No tokens found."])
-
- rows = [
- htm.tr(".align-middle")[
- htm.td[t.label or ""],
- htm.td[util.format_datetime(t.created)],
- htm.td[util.format_datetime(t.expires)],
- htm.td[util.format_datetime(t.last_used) if t.last_used else
"Never"],
- htm.td[_build_delete_form_element(t.id)],
- ]
- for t in tokens_list
- ]
-
- table_elem = htm.table(".table.table-striped")[
- htm.thead[
- htm.tr[
- htm.th["Label"],
- htm.th["Created"],
- htm.th["Expires"],
- htm.th["Last used"],
- htm.th[""],
- ]
- ],
- htm.tbody[rows],
- ]
- return _as_markup(table_elem)
-
-
-async def _create_token(uid: str, label: str | None) -> str:
- plaintext = secrets.token_urlsafe(32)
- token_hash = hashlib.sha3_256(plaintext.encode()).hexdigest()
- created = datetime.datetime.now(datetime.UTC)
- expires = created + datetime.timedelta(days=_EXPIRY_DAYS)
-
- async with storage.write() as write:
- wafc = write.as_foundation_committer()
- await wafc.tokens.add_token(uid, token_hash, created, expires, label)
- return plaintext
-
-
[email protected]_commit_function
-async def _delete_token(data: db.Session, uid: str, token_id: int) -> None:
- pat = await data.query_one_or_none(
- sqlmodel.select(sql.PersonalAccessToken).where(
- sql.PersonalAccessToken.id == token_id,
- sql.PersonalAccessToken.asfuid == uid,
- )
- )
- if pat:
- await data.delete(pat)
-
-
-async def _handle_post(session: web.Committer, request_form:
datastructures.MultiDict) -> web.WerkzeugResponse | None:
- if "token_id" in request_form:
- return await _handle_delete_token_post(session, request_form)
-
- if "label" in request_form:
- return await _handle_add_token_post(session, request_form)
-
- return await _handle_issue_jwt_post(session, request_form)
-
-
-async def _handle_add_token_post(
- session: web.Committer, request_form: datastructures.MultiDict
-) -> web.WerkzeugResponse | None:
- import atr.get as get
-
- add_form = await AddTokenForm.create_form(data=request_form)
- if await add_form.validate_on_submit():
- label_val = str(add_form.label.data) if add_form.label.data else None
- plaintext = await _create_token(session.uid, label_val)
- success_msg = htm.div[
- htm.p[
- htm.strong["Your new token"],
- " is ",
- htm.code(".bg-light.border.rounded.px-1")[plaintext],
- ],
- htm.p(".mb-0")["Copy it now as you will not be able to see it
again."],
- ]
- await quart.flash(_as_markup(success_msg), "success")
- return await session.redirect(get.tokens.tokens)
-
- return None
-
-
-async def _handle_delete_token_post(
- session: web.Committer, request_form: datastructures.MultiDict
-) -> web.WerkzeugResponse | None:
- import atr.get as get
-
- del_form = await DeleteTokenForm.create_form(data=request_form)
- if await del_form.validate_on_submit():
- token_id_val = int(str(del_form.token_id.data))
- await _delete_token(session.uid, token_id_val)
- await quart.flash("Token deleted successfully", "success")
- return await session.redirect(get.tokens.tokens)
-
- await quart.flash("Invalid delete request", "error")
- return None
-
-
-async def _handle_issue_jwt_post(
- session: web.Committer, request_form: datastructures.MultiDict
-) -> web.WerkzeugResponse | None:
- import atr.get as get
-
- issue_form = await IssueJWTForm.create_form(data=request_form)
- if await issue_form.validate_on_submit():
- jwt_token = jwtoken.issue(session.uid)
- success_msg = htm.div[
- htm.p[
- htm.strong["Your new JWT"],
- " is:",
- ],
-
htm.p[htm.code(".bg-light.border.rounded.px-1.atr-word-wrap")[jwt_token],],
- ]
- await quart.flash(_as_markup(success_msg), "success")
- return await session.redirect(get.tokens.tokens)
-
- return None
+type TokenForm = Annotated[
+ AddTokenForm | DeleteTokenForm,
+ form.DISCRIMINATOR,
+]
diff --git a/atr/storage/writers/tokens.py b/atr/storage/writers/tokens.py
index 6250743..e3ca4cf 100644
--- a/atr/storage/writers/tokens.py
+++ b/atr/storage/writers/tokens.py
@@ -67,6 +67,21 @@ class FoundationCommitter(GeneralPublic):
await self.__data.commit()
return pat
+ async def delete_token(self, token_id: int) -> None:
+ pat = await self.__data.query_one_or_none(
+ sqlmodel.select(sql.PersonalAccessToken).where(
+ sql.PersonalAccessToken.id == token_id,
+ sql.PersonalAccessToken.asfuid == self.__asf_uid,
+ )
+ )
+ if pat is not None:
+ await self.__data.delete(pat)
+ await self.__data.commit()
+ self.__write_as.append_to_audit_log(
+ asf_uid=self.__asf_uid,
+ token_id=token_id,
+ )
+
async def issue_jwt(self, pat_text: str) -> str:
pat_hash = hashlib.sha3_256(pat_text.encode()).hexdigest()
pat = await self.__data.query_one_or_none(
diff --git a/atr/web.py b/atr/web.py
index da2c0b5..33b7cff 100644
--- a/atr/web.py
+++ b/atr/web.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Protocol, TypeVar
import asfquart.base as base
import asfquart.session as session
+import markupsafe
import pydantic_core
import quart
import werkzeug.datastructures.headers
@@ -278,6 +279,22 @@ class ZipResponse(quart.Response):
super().__init__(response, status=status, headers=raw_headers,
mimetype="application/zip")
+async def flash_error(*messages: htm.Element) -> None:
+ div = htm.Block(htm.div, classes=".atr-initial")
+ for message in messages:
+ div.append(message)
+
+ await quart.flash(markupsafe.Markup(str(div.collect())), category="error")
+
+
+async def flash_success(*messages: htm.Element) -> None:
+ div = htm.Block(htm.div, classes=".atr-initial")
+ for message in messages:
+ div.append(message)
+
+ await quart.flash(markupsafe.Markup(str(div.collect())),
category="success")
+
+
async def form_error(error: str) -> None:
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]