This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new c8f491b Add a module for processing HTML forms with improved type
safety
c8f491b is described below
commit c8f491bb1e292e1bcaf3723856cdd0675ea3410d
Author: Sean B. Palmer <[email protected]>
AuthorDate: Thu Nov 6 20:46:37 2025 +0000
Add a module for processing HTML forms with improved type safety
---
atr/blueprints/post.py | 63 ++++++
atr/form.py | 513 +++++++++++++++++++++++++++++++++++++++++++++++++
atr/get/test.py | 61 ++++++
atr/models/schema.py | 12 ++
atr/post/__init__.py | 2 +
atr/post/test.py | 69 +++++++
atr/shared/__init__.py | 2 +
atr/shared/test.py | 76 ++++++++
8 files changed, 798 insertions(+)
diff --git a/atr/blueprints/post.py b/atr/blueprints/post.py
index 144bcdf..c539c70 100644
--- a/atr/blueprints/post.py
+++ b/atr/blueprints/post.py
@@ -23,8 +23,10 @@ from typing import Any
import asfquart.auth as auth
import asfquart.base as base
import asfquart.session
+import pydantic
import quart
+import atr.form
import atr.log as log
import atr.web as web
@@ -84,6 +86,67 @@ def committer(path: str) ->
Callable[[web.CommitterRouteFunction[Any]], web.Rout
return decorator
+def empty() -> Callable[[Callable[..., Awaitable[Any]]], Callable[...,
Awaitable[Any]]]:
+ # This means that instead of:
+ #
+ # @post.form(form.Empty)
+ # async def test_empty(
+ # session: web.Committer | None,
+ # form: form.Empty,
+ # ) -> web.WerkzeugResponse:
+ # pass
+ #
+ # We can use:
+ #
+ # @post.empty()
+ # async def test_empty(
+ # session: web.Committer | None,
+ # ) -> web.WerkzeugResponse:
+ # pass
+ def decorator(func: Callable[..., Awaitable[Any]]) -> Callable[...,
Awaitable[Any]]:
+ async def wrapper(session: web.Committer | None, *args: Any, **kwargs:
Any) -> Any:
+ try:
+ form_data = await atr.form.quart_request()
+ context = {"session": session}
+ atr.form.validate(atr.form.Empty, form_data, context)
+ return await func(session, *args, **kwargs)
+ except pydantic.ValidationError as e:
+ # This presumably should not happen
+ log.warning(f"Empty form validation error (CSRF): {e}")
+ await quart.flash("Invalid form submission. Please try
again.", "error")
+ return quart.redirect(quart.request.path)
+
+ wrapper.__name__ = func.__name__
+ wrapper.__doc__ = func.__doc__
+ wrapper.__annotations__ = func.__annotations__.copy()
+ return wrapper
+
+ return decorator
+
+
+def form(
+ form_cls: Any,
+) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
+ def decorator(func: Callable[..., Awaitable[Any]]) -> Callable[...,
Awaitable[Any]]:
+ async def wrapper(session: web.Committer | None, *args: Any, **kwargs:
Any) -> Any:
+ try:
+ form_data = await atr.form.quart_request()
+ context = {"session": session}
+ validated_form = atr.form.validate(form_cls, form_data,
context)
+ return await func(session, validated_form, *args, **kwargs)
+ except pydantic.ValidationError as e:
+ log.warning(f"Form validation error: {e}")
+ await quart.flash(f"Validation error: {e}", "error")
+ return quart.redirect(quart.request.path)
+
+ wrapper.__name__ = func.__name__
+ wrapper.__doc__ = func.__doc__
+ wrapper.__annotations__ = func.__annotations__.copy()
+ return wrapper
+
+ return decorator
+
+
def public(path: str) -> Callable[[Callable[..., Awaitable[Any]]],
web.RouteFunction[Any]]:
def decorator(func: Callable[..., Awaitable[Any]]) ->
web.RouteFunction[Any]:
async def wrapper(*args: Any, **kwargs: Any) -> Any:
diff --git a/atr/form.py b/atr/form.py
new file mode 100644
index 0000000..d6754d3
--- /dev/null
+++ b/atr/form.py
@@ -0,0 +1,513 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import enum
+import types
+from typing import TYPE_CHECKING, Annotated, Any, Final, Literal, get_args,
get_origin
+
+import htpy
+import pydantic
+import pydantic.functional_validators as functional_validators
+import quart
+import quart.datastructures as datastructures
+import quart_wtf.utils as utils
+
+import atr.htm as htm
+import atr.models.schema as schema
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ import markupsafe
+
+ import atr.web as web
+
+DISCRIMINATOR_NAME: Final[str] = "variant"
+DISCRIMINATOR: Final[Any] = schema.discriminator(DISCRIMINATOR_NAME)
+
+
+class Form(schema.Form):
+ pass
+
+
+class Empty(Form):
+ pass
+
+
+class Widget(enum.Enum):
+ CHECKBOX = "checkbox"
+ CHECKBOXES = "checkboxes"
+ EMAIL = "email"
+ FILE = "file"
+ FILES = "files"
+ NUMBER = "number"
+ RADIO = "radio"
+ SELECT = "select"
+ TEXT = "text"
+ TEXTAREA = "textarea"
+ URL = "url"
+
+
+def label(description: str, *, default: Any = ..., widget: Widget | None =
None) -> Any:
+ extra: dict[str, Any] = {"widget": widget.value} if widget else {}
+ return pydantic.Field(default, description=description,
json_schema_extra=extra)
+
+
+def session(info: pydantic.ValidationInfo) -> web.Committer | None:
+ ctx: dict[str, Any] = info.context or {}
+ return ctx.get("session")
+
+
+async def quart_request() -> dict[str, Any]:
+ form_data = await quart.request.form
+ files_data = await quart.request.files
+
+ combined_data = {}
+ for key in form_data.keys():
+ # This is a compromise
+ # Some things expect single values, and some expect lists
+ values = form_data.getlist(key)
+ if len(values) == 1:
+ combined_data[key] = values[0]
+ else:
+ combined_data[key] = values
+
+ files_by_name: dict[str, list[datastructures.FileStorage]] = {}
+ for key in files_data.keys():
+ file_list = files_data.getlist(key)
+ # When no files are uploaded, the browser may supply a file with an
empty filename
+ # We filter that out here
+ non_empty_files = [f for f in file_list if f.filename]
+ if non_empty_files:
+ files_by_name[key] = non_empty_files
+
+ for key, file_list in files_by_name.items():
+ if key in combined_data:
+ raise ValueError(f"Files key {key} already exists in form data")
+ combined_data[key] = file_list
+
+ return combined_data
+
+
+async def render_columns(
+ model_cls: type[Form],
+ action: str | None = None,
+ form_classes: str = ".atr-canary",
+ submit_classes: str = "btn-primary",
+ submit_label: str = "Submit",
+ defaults: dict[str, Any] | None = None,
+ errors: dict[str, list[str]] | None = None,
+) -> htm.Element:
+ if action is None:
+ action = quart.request.path
+
+ label_classes = "col-sm-3 col-form-label text-sm-end"
+
+ field_rows: list[htm.Element] = []
+ hidden_fields: list[htm.Element | htm.VoidElement | markupsafe.Markup] = []
+
+ csrf_token = utils.generate_csrf()
+ hidden_fields.append(htpy.input(type="hidden", name="csrf_token",
value=csrf_token))
+
+ for field_name, field_info in model_cls.model_fields.items():
+ if field_name == "csrf_token":
+ continue
+
+ if defaults:
+ field_value = defaults.get(field_name)
+ elif not field_info.is_required():
+ # Use the Pydantic default if no user default are provided
+ field_value = field_info.get_default(call_default_factory=True)
+ else:
+ field_value = None
+ field_errors = errors.get(field_name) if errors else None
+
+ if (field_name == DISCRIMINATOR_NAME) and (field_info.default is not
None):
+ default_value = field_info.default
+ hidden_fields.append(htpy.input(type="hidden",
name=DISCRIMINATOR_NAME, value=default_value))
+ continue
+
+ label_text = field_info.description or field_name.replace("_", "
").title()
+ is_required = field_info.is_required()
+
+ label_elem = htpy.label(for_=field_name,
class_=label_classes)[label_text]
+
+ widget_elem = _render_widget(
+ field_name=field_name,
+ field_info=field_info,
+ field_value=field_value,
+ field_errors=field_errors,
+ is_required=is_required,
+ )
+
+ row_div = htm.div(".mb-3.row")
+ widget_div = htm.div(".col-sm-8")
+ field_rows.append(row_div[label_elem, widget_div[widget_elem]])
+
+ form_children: list[htm.Element | htm.VoidElement | markupsafe.Markup] =
hidden_fields + field_rows
+
+ submit_button = htpy.button(type="submit", class_=f"btn
{submit_classes}")[submit_label]
+ submit_div = htm.div(".col-sm-9.offset-sm-3")
+ submit_row = htm.div(".row")[submit_div[submit_button]]
+ form_children.append(submit_row)
+
+ return htm.form(form_classes, action=action, method="post",
enctype="multipart/form-data")[form_children]
+
+
+def to_bool(v: Any) -> bool:
+ if isinstance(v, bool):
+ return v
+ if v == "on":
+ return True
+ raise ValueError(f"Cannot convert {v!r} to boolean")
+
+
+def to_enum_set[EnumType: enum.Enum](v: Any, enum_class: type[EnumType]) ->
set[EnumType]:
+ members: dict[str, EnumType] = {member.value: member for member in
enum_class}
+ if isinstance(v, set):
+ return {item for item in v if isinstance(item, enum_class)}
+ if isinstance(v, list):
+ return {members[item] for item in v if item in members}
+ if isinstance(v, str):
+ if v in members:
+ return {members[v]}
+ raise ValueError(f"Invalid enum value: {v!r}")
+ raise ValueError(f"Expected a set of enum values, got {type(v).__name__}")
+
+
+def to_filestorage(v: Any) -> datastructures.FileStorage:
+ if not isinstance(v, datastructures.FileStorage):
+ raise ValueError("Expected an uploaded file")
+ return v
+
+
+def to_filestorage_list(v: Any) -> list[datastructures.FileStorage]:
+ if isinstance(v, list):
+ result = []
+ for item in v:
+ if not isinstance(item, datastructures.FileStorage):
+ raise ValueError("Expected a list of uploaded files")
+ result.append(item)
+ return result
+ if isinstance(v, datastructures.FileStorage):
+ return [v]
+ raise ValueError("Expected a list of uploaded files")
+
+
+def to_int(v: Any) -> int:
+ # if v == "":
+ # return 0
+ try:
+ return int(v)
+ except ValueError:
+ raise ValueError(f"Invalid integer value: {v!r}")
+
+
+# Validator types come before other functions
+# We must not use the "type" keyword here, otherwise Pydantic complains
+
+Bool = Annotated[
+ bool,
+ functional_validators.BeforeValidator(to_bool),
+ pydantic.Field(default=False),
+]
+
+Email = pydantic.EmailStr
+
+File = Annotated[
+ datastructures.FileStorage,
+ functional_validators.BeforeValidator(to_filestorage),
+]
+
+FileList = Annotated[
+ list[datastructures.FileStorage],
+ functional_validators.BeforeValidator(to_filestorage_list),
+ pydantic.Field(default_factory=list),
+]
+
+Int = Annotated[
+ int,
+ functional_validators.BeforeValidator(to_int),
+]
+
+
+class Set[EnumType: enum.Enum]:
+ def __iter__(self) -> Iterator[EnumType]:
+ # For type checkers
+ raise NotImplementedError
+
+ @staticmethod
+ def __class_getitem__(enum_class: type[EnumType]):
+ def validator(v: Any) -> set[EnumType]:
+ return to_enum_set(v, enum_class)
+
+ return Annotated[
+ set[enum_class],
+ functional_validators.BeforeValidator(validator),
+ pydantic.Field(default_factory=set),
+ ]
+
+
+def validate(model_cls: Any, form: dict[str, Any], context: dict[str, Any] |
None = None) -> pydantic.BaseModel:
+ # Since pydantic.TypeAdapter accepts Any, we do the same
+ return pydantic.TypeAdapter(model_cls).validate_python(form,
context=context)
+
+
+def value(type_alias: Any) -> Any:
+ # This is for unwrapping from Literal for discriminators
+ if hasattr(type_alias, "__value__"):
+ type_alias = type_alias.__value__
+ args = get_args(type_alias)
+ if args:
+ return args[0]
+ raise ValueError(f"Cannot extract default value from {type_alias}")
+
+
+def widget(widget_type: Widget) -> Any:
+ return pydantic.Field(..., json_schema_extra={"widget": widget_type.value})
+
+
+def _render_widget( # noqa: C901
+ field_name: str,
+ field_info: pydantic.fields.FieldInfo,
+ field_value: Any,
+ field_errors: list[str] | None,
+ is_required: bool,
+) -> htm.Element | htm.VoidElement:
+ widget_type = _get_widget_type(field_info)
+ widget_classes = _get_widget_classes(widget_type, field_errors)
+
+ base_attrs: dict[str, str] = {"name": field_name, "id": field_name,
"class_": widget_classes}
+
+ elements: list[htm.Element | htm.VoidElement] = []
+
+ match widget_type:
+ case Widget.CHECKBOX:
+ attrs: dict[str, str] = {
+ "type": "checkbox",
+ "name": field_name,
+ "id": field_name,
+ "class_": "form-check-input",
+ }
+ if field_value:
+ attrs["checked"] = ""
+ widget = htpy.input(**attrs)
+
+ case Widget.CHECKBOXES:
+ choices = _get_choices(field_info)
+ if isinstance(field_value, set):
+ selected_values = [item.value for item in field_value]
+ else:
+ selected_values = field_value if isinstance(field_value, list)
else []
+ checkboxes = []
+ for val, label in choices:
+ checkbox_id = f"{field_name}_{val}"
+ checkbox_attrs: dict[str, str] = {
+ "type": "checkbox",
+ "name": field_name,
+ "id": checkbox_id,
+ "value": val,
+ "class_": "form-check-input",
+ }
+ if val in selected_values:
+ checkbox_attrs["checked"] = ""
+ checkbox_input = htpy.input(**checkbox_attrs)
+ checkbox_label = htpy.label(for_=checkbox_id,
class_="form-check-label")[label]
+
checkboxes.append(htpy.div(class_="form-check")[checkbox_input, checkbox_label])
+ elements.extend(checkboxes)
+ widget = htm.div[checkboxes]
+
+ case Widget.EMAIL:
+ attrs = {**base_attrs, "type": "email"}
+ if field_value:
+ attrs["value"] = str(field_value)
+ widget = htpy.input(**attrs)
+
+ case Widget.FILE:
+ widget = htpy.input(type="file", **base_attrs)
+
+ case Widget.FILES:
+ attrs = {**base_attrs, "multiple": ""}
+ widget = htpy.input(type="file", **attrs)
+
+ case Widget.NUMBER:
+ attrs = {**base_attrs, "type": "number"}
+ attrs["value"] = "0" if (field_value is None) else str(field_value)
+ widget = htpy.input(**attrs)
+
+ case Widget.RADIO:
+ choices = _get_choices(field_info)
+ radios = []
+ for val, label in choices:
+ radio_id = f"{field_name}_{val}"
+ radio_attrs: dict[str, str] = {
+ "type": "radio",
+ "name": field_name,
+ "id": radio_id,
+ "value": val,
+ "class_": "form-check-input",
+ }
+ if is_required:
+ radio_attrs["required"] = ""
+ if val == field_value:
+ radio_attrs["checked"] = ""
+ radio_input = htpy.input(**radio_attrs)
+ radio_label = htpy.label(for_=radio_id,
class_="form-check-label")[label]
+ radios.append(htpy.div(class_="form-check")[radio_input,
radio_label])
+ elements.extend(radios)
+ widget = htm.div[radios]
+
+ case Widget.SELECT:
+ choices = _get_choices(field_info)
+ options = [
+ htpy.option(
+ value=val,
+ selected="" if (val == field_value) else None,
+ )[label]
+ for val, label in choices
+ ]
+ widget = htpy.select(**base_attrs)[options]
+
+ case Widget.TEXT:
+ attrs = {**base_attrs, "type": "text"}
+ if field_value:
+ attrs["value"] = str(field_value)
+ widget = htpy.input(**attrs)
+
+ case Widget.TEXTAREA:
+ widget = htpy.textarea(**base_attrs)[field_value or ""]
+
+ case Widget.URL:
+ attrs = {**base_attrs, "type": "url"}
+ if field_value:
+ attrs["value"] = str(field_value)
+ widget = htpy.input(**attrs)
+
+ if not elements:
+ elements.append(widget)
+
+ if field_errors:
+ error_text = " ".join(field_errors)
+ error_div = htm.div(".invalid-feedback.d-block")[error_text]
+ elements.append(error_div)
+
+ return htm.div[elements] if len(elements) > 1 else elements[0]
+
+
+def _get_choices(field_info: pydantic.fields.FieldInfo) -> list[tuple[str,
str]]:
+ annotation = field_info.annotation
+ origin = get_origin(annotation)
+
+ if origin is Literal:
+ return [(v, v) for v in get_args(annotation)]
+
+ if origin is set:
+ args = get_args(annotation)
+ if args and hasattr(args[0], "__members__"):
+ enum_class = args[0]
+ return [(member.value, member.value) for member in enum_class]
+
+ if origin is list:
+ args = get_args(annotation)
+ if args and get_origin(args[0]) is Literal:
+ return [(v, v) for v in get_args(args[0])]
+
+ return []
+
+
+def _get_widget_classes(widget_type: Widget, has_errors: list[str] | None) ->
str:
+ match widget_type:
+ case Widget.SELECT:
+ base_class = "form-select"
+ case Widget.CHECKBOX | Widget.RADIO | Widget.CHECKBOXES:
+ return "form-check-input"
+ case _:
+ base_class = "form-control"
+
+ if has_errors:
+ return f"{base_class} is-invalid"
+ return base_class
+
+
+def _get_widget_type(field_info: pydantic.fields.FieldInfo) -> Widget: #
noqa: C901
+ json_schema_extra = field_info.json_schema_extra or {}
+ if isinstance(json_schema_extra, dict) and "widget" in json_schema_extra:
+ widget_value = json_schema_extra["widget"]
+ if isinstance(widget_value, str):
+ try:
+ return Widget(widget_value)
+ except ValueError:
+ pass
+
+ annotation = field_info.annotation
+ origin = get_origin(annotation)
+
+ if (annotation is not None) and hasattr(annotation, "__value__"):
+ annotation = annotation.__value__
+ origin = get_origin(annotation)
+
+ if isinstance(annotation, types.UnionType) or (origin is type(None)):
+ args = get_args(annotation)
+ non_none_types = [arg for arg in args if (arg is not type(None))]
+ if non_none_types:
+ annotation = non_none_types[0]
+ origin = get_origin(annotation)
+
+ if origin is Annotated:
+ args = get_args(annotation)
+ annotation = args[0]
+ origin = get_origin(annotation)
+
+ if annotation is datastructures.FileStorage:
+ return Widget.FILE
+
+ if annotation is bool:
+ return Widget.CHECKBOX
+
+ if annotation is pydantic.EmailStr:
+ return Widget.EMAIL
+
+ if annotation in (int, float):
+ return Widget.NUMBER
+
+ if origin is Literal:
+ return Widget.SELECT
+
+ if origin is set:
+ args = get_args(annotation)
+ if args and hasattr(args[0], "__members__"):
+ return Widget.CHECKBOXES
+
+ if origin is list:
+ args = get_args(annotation)
+ if args:
+ first_arg = args[0]
+ if get_origin(first_arg) is Literal:
+ return Widget.CHECKBOXES
+ if first_arg is datastructures.FileStorage:
+ return Widget.FILES
+ if hasattr(first_arg, "__value__"):
+ inner = first_arg.__value__
+ inner_origin = get_origin(inner)
+ if inner_origin is Annotated:
+ inner_args = get_args(inner)
+ if inner_args and (inner_args[0] is
datastructures.FileStorage):
+ return Widget.FILES
+
+ return Widget.TEXT
diff --git a/atr/get/test.py b/atr/get/test.py
index e74454a..35cbe2f 100644
--- a/atr/get/test.py
+++ b/atr/get/test.py
@@ -20,10 +20,31 @@ import asfquart.session
import atr.blueprints.get as get
import atr.config as config
+import atr.form as form
import atr.get.root as root
+import atr.htm as htm
+import atr.shared as shared
+import atr.template as template
import atr.web as web
[email protected]("/test/empty")
+async def test_empty(session: web.Committer | None) -> str:
+ empty_form = await form.render_columns(
+ model_cls=form.Empty,
+ submit_label="Submit empty form",
+ action="/test/empty",
+ )
+
+ forms_html = htm.div[
+ htm.h2["Empty form"],
+ htm.p["This form only validates the CSRF token and contains no other
fields."],
+ empty_form,
+ ]
+
+ return await template.blank(title="Test empty form", content=forms_html)
+
+
@get.public("/test/login")
async def test_login(session: web.Committer | None) -> web.WerkzeugResponse:
if not config.get().ALLOW_TESTS:
@@ -42,3 +63,43 @@ async def test_login(session: web.Committer | None) ->
web.WerkzeugResponse:
asfquart.session.write(session_data)
return await web.redirect(root.index)
+
+
[email protected]("/test/multiple")
+async def test_multiple(session: web.Committer | None) -> str:
+ apple_form = await form.render_columns(
+ model_cls=shared.test.AppleForm,
+ submit_label="Order apples",
+ action="/test/multiple",
+ )
+
+ banana_form = await form.render_columns(
+ model_cls=shared.test.BananaForm,
+ submit_label="Order bananas",
+ action="/test/multiple",
+ )
+
+ forms_html = htm.div[
+ htm.h2["Apple order form"],
+ apple_form,
+ htm.h2["Banana order form"],
+ banana_form,
+ ]
+
+ return await template.blank(title="Test multiple forms",
content=forms_html)
+
+
[email protected]("/test/single")
+async def test_single(session: web.Committer | None) -> str:
+ single_form = await form.render_columns(
+ model_cls=shared.test.SingleForm,
+ submit_label="Submit",
+ action="/test/single",
+ )
+
+ forms_html = htm.div[
+ htm.h2["Single form"],
+ single_form,
+ ]
+
+ return await template.blank(title="Test single form", content=forms_html)
diff --git a/atr/models/schema.py b/atr/models/schema.py
index 04f8c33..6bcb013 100644
--- a/atr/models/schema.py
+++ b/atr/models/schema.py
@@ -32,6 +32,18 @@ class Strict(pydantic.BaseModel):
model_config = pydantic.ConfigDict(extra="forbid", strict=True,
validate_assignment=True)
+class Form(pydantic.BaseModel):
+ model_config = pydantic.ConfigDict(
+ extra="forbid",
+ strict=False,
+ validate_assignment=True,
+ arbitrary_types_allowed=True,
+ str_strip_whitespace=True,
+ )
+
+ csrf_token: str | None = None
+
+
def alias(alias_name: str) -> Any:
"""Helper to create a Pydantic FieldInfo object with only an alias."""
return Field(alias=alias_name)
diff --git a/atr/post/__init__.py b/atr/post/__init__.py
index 576ce08..c014cf3 100644
--- a/atr/post/__init__.py
+++ b/atr/post/__init__.py
@@ -30,6 +30,7 @@ import atr.post.resolve as resolve
import atr.post.revisions as revisions
import atr.post.sbom as sbom
import atr.post.start as start
+import atr.post.test as test
import atr.post.tokens as tokens
import atr.post.upload as upload
import atr.post.user as user
@@ -52,6 +53,7 @@ __all__ = [
"revisions",
"sbom",
"start",
+ "test",
"tokens",
"upload",
"user",
diff --git a/atr/post/test.py b/atr/post/test.py
new file mode 100644
index 0000000..3e3ef1e
--- /dev/null
+++ b/atr/post/test.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import quart
+
+import atr.blueprints.post as post
+import atr.get as get
+import atr.log as log
+import atr.shared as shared
+import atr.web as web
+
+
[email protected]("/test/empty")
[email protected]()
+async def test_empty(session: web.Committer | None) -> web.WerkzeugResponse:
+ msg = "Empty form submitted successfully"
+ log.info(msg)
+ await quart.flash(msg, "success")
+ return await web.redirect(get.test.test_empty)
+
+
[email protected]("/test/multiple")
[email protected](shared.test.MultipleForm)
+async def test_multiple(session: web.Committer | None, form:
shared.test.MultipleForm) -> web.WerkzeugResponse:
+ match form:
+ case shared.test.AppleForm() as apple:
+ msg = f"Apple order received: variety={apple.variety},
quantity={apple.quantity}, organic={apple.organic}"
+ log.info(msg)
+ await quart.flash(msg, "success")
+
+ case shared.test.BananaForm() as banana:
+ msg = f"Banana order received: ripeness={banana.ripeness},
bunch_size={banana.bunch_size}"
+ log.info(msg)
+ await quart.flash(msg, "success")
+
+ return await web.redirect(get.test.test_multiple)
+
+
[email protected]("/test/single")
[email protected](shared.test.SingleForm)
+async def test_single(session: web.Committer | None, form:
shared.test.SingleForm) -> web.WerkzeugResponse:
+ file_names = [f.filename for f in form.files] if form.files else []
+ compatibility_names = [f.value for f in form.compatibility] if
form.compatibility else []
+ msg = (
+ f"Single form received:"
+ f" name={form.name},"
+ f" email={form.email},"
+ f" message={form.message},"
+ f" files={file_names},"
+ f" compatibility={compatibility_names}"
+ )
+ log.info(msg)
+ await quart.flash(msg, "success")
+
+ return await web.redirect(get.test.test_single)
diff --git a/atr/shared/__init__.py b/atr/shared/__init__.py
index a51a194..1426353 100644
--- a/atr/shared/__init__.py
+++ b/atr/shared/__init__.py
@@ -33,6 +33,7 @@ import atr.shared.keys as keys
import atr.shared.projects as projects
import atr.shared.resolve as resolve
import atr.shared.start as start
+import atr.shared.test as test
import atr.shared.tokens as tokens
import atr.shared.upload as upload
import atr.shared.user as user
@@ -193,6 +194,7 @@ __all__ = [
"projects",
"resolve",
"start",
+ "test",
"tokens",
"upload",
"user",
diff --git a/atr/shared/test.py b/atr/shared/test.py
new file mode 100644
index 0000000..971f477
--- /dev/null
+++ b/atr/shared/test.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import enum
+from typing import Annotated, Literal
+
+import pydantic
+
+import atr.form as form
+
+type APPLE = Literal["apple"]
+type BANANA = Literal["banana"]
+
+
+class Compatibility(enum.Enum):
+ Alpha = "Alpha"
+ Beta = "Beta"
+ Gamma = "Gamma"
+
+
+class AppleForm(form.Form):
+ variant: APPLE = form.value(APPLE)
+ variety: Literal["Granny Smith", "Honeycrisp", "Gala"] = form.label("Apple
variety")
+ quantity: form.Int = form.label("Number of apples")
+ organic: form.Bool = form.label("Organic?")
+
+
+class BananaForm(form.Form):
+ variant: BANANA = form.value(BANANA)
+ ripeness: Literal["Green", "Yellow", "Brown"] = form.label("Ripeness
level", widget=form.Widget.RADIO)
+ bunch_size: form.Int = form.label("Number of bananas in bunch")
+
+
+type MultipleForm = Annotated[
+ AppleForm | BananaForm,
+ form.DISCRIMINATOR,
+]
+
+
+class SingleForm(form.Form):
+ name: str = form.label("Full name")
+ email: form.Email = form.label("Email address")
+ message: str = form.label("Message")
+ files: form.FileList = form.label("Files to upload")
+ compatibility: form.Set[Compatibility] = form.label("Compatibility")
+
+ @pydantic.field_validator("email")
+ @classmethod
+ def validate_email(cls, value: str, info: pydantic.ValidationInfo) -> str:
+ if value == "":
+ return value
+
+ session = form.session(info)
+
+ if session is None:
+ return value
+
+ expected_email = f"{session.asf_uid}@apache.org"
+ if value != expected_email:
+ raise ValueError(f"Email must be empty or {expected_email}")
+
+ return value
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]