This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new 30eef68  Use a wrapper for plain text responses
30eef68 is described below

commit 30eef6845e9450ab4b59fdd4beed080d527842a3
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Oct 27 19:24:10 2025 +0000

    Use a wrapper for plain text responses
---
 atr/admin/__init__.py  |  18 ++++----
 atr/routes/download.py |  13 +++---
 atr/routes/draft.py    |   3 +-
 atr/routes/keys.py     |   5 ++-
 atr/routes/preview.py  |   7 ++--
 atr/routes/tokens.py   | 109 +++++++++++++++++++++----------------------------
 atr/web.py             |   5 +++
 7 files changed, 77 insertions(+), 83 deletions(-)

diff --git a/atr/admin/__init__.py b/atr/admin/__init__.py
index 676d572..dd23ae7 100644
--- a/atr/admin/__init__.py
+++ b/atr/admin/__init__.py
@@ -190,7 +190,7 @@ async def configuration(session: web.Committer) -> 
quart.wrappers.response.Respo
         values.append(f"{name}={val}")
 
     values.sort()
-    return quart.Response("\n".join(values), mimetype="text/plain")
+    return web.TextResponse("\n".join(values))
 
 
 @admin.get("/consistency")
@@ -433,7 +433,7 @@ async def env(session: web.Committer) -> 
quart.wrappers.response.Response:
     env_vars = []
     for key, value in os.environ.items():
         env_vars.append(f"{key}={value}")
-    return quart.Response("\n".join(env_vars), mimetype="text/plain")
+    return web.TextResponse("\n".join(env_vars))
 
 
 @admin.get("/keys/check")
@@ -455,10 +455,10 @@ async def _keys_check(session: web.Committer) -> 
quart.Response:
 
     try:
         result = await _check_keys()
-        return quart.Response(result, mimetype="text/plain")
+        return web.TextResponse(result)
     except Exception as e:
         log.exception("Exception during key check:")
-        return quart.Response(f"Exception during key check: {e!s}", 
mimetype="text/plain")
+        return web.TextResponse(f"Exception during key check: {e!s}")
 
 
 @admin.get("/keys/regenerate-all")
@@ -496,7 +496,7 @@ async def _keys_regenerate_all(session: web.Committer) -> 
quart.Response:
     for oce in outcomes.errors():
         response_lines.append(f"Error regenerating: {type(oce).__name__} 
{oce}")
 
-    return quart.Response("\n".join(response_lines), mimetype="text/plain")
+    return web.TextResponse("\n".join(response_lines))
 
 
 @admin.get("/keys/update")
@@ -598,10 +598,10 @@ async def _ongoing_tasks(
 ) -> quart.wrappers.response.Response:
     try:
         ongoing = await interaction.tasks_ongoing(project_name, version_name, 
revision)
-        return quart.Response(str(ongoing), mimetype="text/plain")
+        return web.TextResponse(str(ongoing))
     except Exception:
         log.exception(f"Error fetching ongoing task count for {project_name} 
{version_name} rev {revision}:")
-        return quart.Response("", mimetype="text/plain")
+        return web.TextResponse("")
 
 
 @admin.get("/performance")
@@ -736,7 +736,7 @@ async def task_times(
             ms_elapsed = (task.completed - task.started).total_seconds() * 1000
             values.append(f"{task.task_type} {ms_elapsed:.2f}ms")
 
-    return quart.Response("\n".join(values), mimetype="text/plain")
+    return web.TextResponse("\n".join(values))
 
 
 @admin.get("/test")
@@ -769,7 +769,7 @@ async def test(session: web.Committer) -> 
quart.wrappers.response.Response:
     log.info(f"Inserted: {inserted_count}")
     log.info(f"Linked: {linked_count}")
     log.info(f"InsertedAndLinked: {inserted_and_linked_count}")
-    return quart.Response(str(wacm), mimetype="text/plain")
+    return web.TextResponse(str(wacm))
 
 
 @admin.get("/toggle-view")
diff --git a/atr/routes/download.py b/atr/routes/download.py
index 8a8614d..1dbb5c5 100644
--- a/atr/routes/download.py
+++ b/atr/routes/download.py
@@ -37,6 +37,7 @@ import atr.routes.mapping as mapping
 import atr.routes.root as root
 import atr.template as template
 import atr.util as util
+import atr.web as web
 
 
 @route.committer("/download/all/<project_name>/<version_name>")
@@ -109,11 +110,11 @@ async def urls_selected(
                 ValueError("Release not found")
             )
         url_list_str = await _generate_file_url_list(release)
-        return quart.Response(url_list_str, mimetype="text/plain")
+        return web.TextResponse(url_list_str)
     except ValueError as e:
-        return quart.Response(f"Error: {e}", status=404, mimetype="text/plain")
+        return web.TextResponse(f"Error: {e}", status=404)
     except Exception as e:
-        return quart.Response(f"Internal server error: {e}", status=500, 
mimetype="text/plain")
+        return web.TextResponse(f"Internal server error: {e}", status=500)
 
 
 @route.committer("/download/zip/<project_name>/<version_name>")
@@ -123,9 +124,9 @@ async def zip_selected(
     try:
         release = await session.release(project_name=project_name, 
version_name=version_name, phase=None)
     except ValueError as e:
-        return quart.Response(f"Error: {e}", status=404, mimetype="text/plain")
+        return web.TextResponse(f"Error: {e}", status=404)
     except Exception as e:
-        return quart.Response(f"Server error: {e}", status=500, 
mimetype="text/plain")
+        return web.TextResponse(f"Server error: {e}", status=500)
 
     base_dir = util.release_directory(release)
     files_to_zip = []
@@ -135,7 +136,7 @@ async def zip_selected(
             if await aiofiles.os.path.isfile(full_item_path):
                 files_to_zip.append({"file": str(full_item_path), "name": 
str(rel_path)})
     except FileNotFoundError:
-        return quart.Response("Error: Release directory not found.", 
status=404, mimetype="text/plain")
+        return web.TextResponse("Error: Release directory not found.", 
status=404)
 
     async def stream_zip(file_list: list[dict[str, str]]) -> 
AsyncGenerator[bytes]:
         aiozip = zipstream.AioZipStream(file_list, chunksize=32768)
diff --git a/atr/routes/draft.py b/atr/routes/draft.py
index a6a944f..f1b6932 100644
--- a/atr/routes/draft.py
+++ b/atr/routes/draft.py
@@ -39,6 +39,7 @@ import atr.routes.upload as upload
 import atr.storage as storage
 import atr.template as template
 import atr.util as util
+import atr.web as web
 
 if TYPE_CHECKING:
     import werkzeug.wrappers.response as response
@@ -412,4 +413,4 @@ async def vote_preview(
             vote_duration=vote_duration,
         ),
     )
-    return quart.Response(body, mimetype="text/plain")
+    return web.TextResponse(body)
diff --git a/atr/routes/keys.py b/atr/routes/keys.py
index 738a6b0..6cbda29 100644
--- a/atr/routes/keys.py
+++ b/atr/routes/keys.py
@@ -42,6 +42,7 @@ import atr.storage.types as types
 import atr.template as template
 import atr.user as user
 import atr.util as util
+import atr.web as web
 
 
 class AddOpenPGPKeyForm(forms.Typed):
@@ -282,13 +283,13 @@ async def details(session: route.CommitterSession, 
fingerprint: str) -> str | re
 
 
 @route.committer("/keys/export/<committee_name>")
-async def export(session: route.CommitterSession, committee_name: str) -> 
quart.Response:
+async def export(session: route.CommitterSession, committee_name: str) -> 
web.TextResponse:
     """Export a KEYS file for a specific committee."""
     async with storage.write() as write:
         wafc = write.as_foundation_committer()
         keys_file_text = await wafc.keys.keys_file_text(committee_name)
 
-    return quart.Response(keys_file_text, mimetype="text/plain")
+    return web.TextResponse(keys_file_text)
 
 
 @route.committer("/keys/import/<project_name>/<version_name>", 
methods=["POST"])
diff --git a/atr/routes/preview.py b/atr/routes/preview.py
index 87889e7..e29e130 100644
--- a/atr/routes/preview.py
+++ b/atr/routes/preview.py
@@ -30,6 +30,7 @@ import atr.routes.root as root
 import atr.storage as storage
 import atr.template as template
 import atr.util as util
+import atr.web as web
 
 if asfquart.APP is ...:
     raise RuntimeError("APP is not set")
@@ -64,7 +65,7 @@ async def announce_preview(
         if form.errors:
             error_details = "; ".join([f"{field}: {', '.join(errs)}" for 
field, errs in form.errors.items()])
             error_message = f"{error_message}: {error_details}"
-        return quart.Response(f"Error: {error_message}", status=400, 
mimetype="text/plain")
+        return web.TextResponse(f"Error: {error_message}", status=400)
 
     try:
         # Construct options and generate body
@@ -76,11 +77,11 @@ async def announce_preview(
         )
         preview_body = await 
construct.announce_release_body(str(form.body.data), options)
 
-        return quart.Response(preview_body, mimetype="text/plain")
+        return web.TextResponse(preview_body)
 
     except Exception as e:
         log.exception("Error generating announcement preview:")
-        return quart.Response(f"Error generating preview: {e!s}", status=500, 
mimetype="text/plain")
+        return web.TextResponse(f"Error generating preview: {e!s}", status=500)
 
 
 @route.committer("/preview/delete", methods=["POST"])
diff --git a/atr/routes/tokens.py b/atr/routes/tokens.py
index 7250828..55f2ca0 100644
--- a/atr/routes/tokens.py
+++ b/atr/routes/tokens.py
@@ -28,26 +28,10 @@ import sqlmodel
 import werkzeug.datastructures as datastructures
 import werkzeug.wrappers.response as response
 import wtforms.fields.core as core
-from htpy import (
-    Element,
-    code,
-    div,
-    form,
-    h1,
-    h2,
-    p,
-    pre,
-    strong,
-    table,
-    tbody,
-    td,
-    th,
-    thead,
-    tr,
-)
 
 import atr.db as db
 import atr.forms as forms
+import atr.htm as htm
 import atr.jwtoken as jwtoken
 import atr.log as log
 import atr.models.sql as sql
@@ -55,11 +39,12 @@ import atr.route as route
 import atr.storage as storage
 import atr.template as templates
 import atr.util as util
+import atr.web as web
 
 _EXPIRY_DAYS: Final[int] = 180
 
 
-type Fragment = Element | core.Field | str
+type Fragment = htm.Element | core.Field | str
 
 
 class AddTokenForm(forms.Typed):
@@ -81,7 +66,7 @@ async def jwt_post(session: route.CommitterSession) -> 
quart.Response:
     await util.validate_empty_form()
 
     jwt_token = jwtoken.issue(session.uid)
-    return quart.Response(jwt_token, mimetype="text/plain")
+    return web.TextResponse(jwt_token)
 
 
 @route.committer("/tokens", methods=["GET", "POST"])
@@ -109,7 +94,7 @@ async def tokens(session: route.CommitterSession) -> str | 
response.Response:
     tokens_table = _build_tokens_table(tokens_list)
 
     issue_jwt = [
-        p[
+        htm.p[
             """Generate a JSON Web Token (JWT) to authenticate calls to ATR's
             private API routes. Treat the token like a password and include it
             in the Authorization header as a Bearer token when invoking the
@@ -117,35 +102,35 @@ async def tokens(session: route.CommitterSession) -> str 
| response.Response:
             # p["Example"],
         ],
         issue_form_elem,
-        pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap border 
rounded w-50"),
+        htm.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap border 
rounded w-50"),
     ]
 
     if most_recent_pat and most_recent_pat.last_used:
         issue_jwt.append(
-            p(".mt-3")[
+            htm.p(".mt-3")[
                 "You most recently used a PAT to issue a JWT at ",
-                strong[util.format_datetime(most_recent_pat.last_used) + "Z"],
+                htm.strong[util.format_datetime(most_recent_pat.last_used) + 
"Z"],
                 ", using the PAT labelled ",
-                code[most_recent_pat.label or "[Untitled]"],
+                htm.code[most_recent_pat.label or "[Untitled]"],
                 ".",
             ]
         )
 
-    content_elem = div[
-        h1["Tokens"],
-        h2["Personal Access Tokens (PATs)"],
-        p[
+    content_elem = htm.div[
+        htm.h1["Tokens"],
+        htm.h2["Personal Access Tokens (PATs)"],
+        htm.p[
             """Generate tokens for API access. For security, the plaintext
             token is shown only once when you create it. You can revoke tokens
             you no longer need."""
         ],
-        div(".card.mb-4")[
-            div(".card-header")["Generate new token"],
-            div(".card-body")[add_form_elem],
+        htm.div(".card.mb-4")[
+            htm.div(".card-header")["Generate new token"],
+            htm.div(".card-body")[add_form_elem],
         ],
         tokens_table,
-        h2["JSON Web Token (JWT)"],
-        div[issue_jwt],
+        htm.h2["JSON Web Token (JWT)"],
+        htm.div[issue_jwt],
     ]
     end = time.perf_counter_ns()
     log.info("Content elem built in %dms", (end - start) / 1_000_000)
@@ -169,9 +154,9 @@ def _as_markup(fragment: Fragment) -> markupsafe.Markup:
 
 
 def _build_add_form_element(a_form: AddTokenForm) -> markupsafe.Markup:
-    elem = form(method="post", action=util.as_url(tokens))[
+    elem = htm.form(method="post", action=util.as_url(tokens))[
         _as_markup(a_form.csrf_token),
-        div(".mb-3")[
+        htm.div(".mb-3")[
             a_form.label.label,
             a_form.label(class_="form-control"),
         ],
@@ -183,7 +168,7 @@ def _build_add_form_element(a_form: AddTokenForm) -> 
markupsafe.Markup:
 def _build_delete_form_element(token_id: int | None) -> markupsafe.Markup:
     d_form = DeleteTokenForm()
     d_form.token_id.data = "" if token_id is None else str(token_id)
-    elem = form(".mb-0", method="post", action=util.as_url(tokens))[
+    elem = htm.form(".mb-0", method="post", action=util.as_url(tokens))[
         _as_markup(d_form.csrf_token),
         _as_markup(d_form.token_id),
         d_form.submit(class_="btn btn-sm btn-danger"),
@@ -192,7 +177,7 @@ def _build_delete_form_element(token_id: int | None) -> 
markupsafe.Markup:
 
 
 def _build_issue_jwt_form_element(j_form: IssueJWTForm) -> markupsafe.Markup:
-    elem = form("#issue-jwt-form", method="post", 
action=util.as_url(jwt_post))[
+    elem = htm.form("#issue-jwt-form", method="post", 
action=util.as_url(jwt_post))[
         _as_markup(j_form.csrf_token),
         j_form.submit(class_="btn btn-primary"),
     ]
@@ -201,30 +186,30 @@ def _build_issue_jwt_form_element(j_form: IssueJWTForm) 
-> markupsafe.Markup:
 
 def _build_tokens_table(tokens_list: list[sql.PersonalAccessToken]) -> 
markupsafe.Markup:
     if not tokens_list:
-        return _as_markup(p["No tokens found."])
+        return _as_markup(htm.p["No tokens found."])
 
     rows = [
-        tr(".align-middle")[
-            td[t.label or ""],
-            td[util.format_datetime(t.created)],
-            td[util.format_datetime(t.expires)],
-            td[util.format_datetime(t.last_used) if t.last_used else "Never"],
-            td[_build_delete_form_element(t.id)],
+        htm.tr(".align-middle")[
+            htm.td[t.label or ""],
+            htm.td[util.format_datetime(t.created)],
+            htm.td[util.format_datetime(t.expires)],
+            htm.td[util.format_datetime(t.last_used) if t.last_used else 
"Never"],
+            htm.td[_build_delete_form_element(t.id)],
         ]
         for t in tokens_list
     ]
 
-    table_elem = table(".table.table-striped")[
-        thead[
-            tr[
-                th["Label"],
-                th["Created"],
-                th["Expires"],
-                th["Last used"],
-                th[""],
+    table_elem = htm.table(".table.table-striped")[
+        htm.thead[
+            htm.tr[
+                htm.th["Label"],
+                htm.th["Created"],
+                htm.th["Expires"],
+                htm.th["Last used"],
+                htm.th[""],
             ]
         ],
-        tbody[rows],
+        htm.tbody[rows],
     ]
     return _as_markup(table_elem)
 
@@ -272,13 +257,13 @@ async def _handle_add_token_post(
     if await add_form.validate_on_submit():
         label_val = str(add_form.label.data) if add_form.label.data else None
         plaintext = await _create_token(session.uid, label_val)
-        success_msg = div[
-            p[
-                strong["Your new token"],
+        success_msg = htm.div[
+            htm.p[
+                htm.strong["Your new token"],
                 " is ",
-                code(".bg-light.border.rounded.px-1")[plaintext],
+                htm.code(".bg-light.border.rounded.px-1")[plaintext],
             ],
-            p(".mb-0")["Copy it now as you will not be able to see it again."],
+            htm.p(".mb-0")["Copy it now as you will not be able to see it 
again."],
         ]
         await quart.flash(_as_markup(success_msg), "success")
         return await session.redirect(tokens)
@@ -306,12 +291,12 @@ async def _handle_issue_jwt_post(
     issue_form = await IssueJWTForm.create_form(data=request_form)
     if await issue_form.validate_on_submit():
         jwt_token = jwtoken.issue(session.uid)
-        success_msg = div[
-            p[
-                strong["Your new JWT"],
+        success_msg = htm.div[
+            htm.p[
+                htm.strong["Your new JWT"],
                 " is:",
             ],
-            p[code(".bg-light.border.rounded.px-1.atr-word-wrap")[jwt_token],],
+            
htm.p[htm.code(".bg-light.border.rounded.px-1.atr-word-wrap")[jwt_token],],
         ]
         await quart.flash(_as_markup(success_msg), "success")
         return await session.redirect(tokens)
diff --git a/atr/web.py b/atr/web.py
index 4fec710..1bfa057 100644
--- a/atr/web.py
+++ b/atr/web.py
@@ -180,6 +180,11 @@ class RouteFunction(Protocol[R]):
     def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[R]: ...
 
 
+class TextResponse(quart.Response):
+    def __init__(self, text: str, status: int = 200) -> None:
+        super().__init__(text, status=status, mimetype="text/plain")
+
+
 async def redirect[R](
     route: RouteFunction[R], success: str | None = None, error: str | None = 
None, **kwargs: Any
 ) -> response.Response:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to