This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new c8f491b  Add a module for processing HTML forms with improved type 
safety
c8f491b is described below

commit c8f491bb1e292e1bcaf3723856cdd0675ea3410d
Author: Sean B. Palmer <[email protected]>
AuthorDate: Thu Nov 6 20:46:37 2025 +0000

    Add a module for processing HTML forms with improved type safety
---
 atr/blueprints/post.py |  63 ++++++
 atr/form.py            | 513 +++++++++++++++++++++++++++++++++++++++++++++++++
 atr/get/test.py        |  61 ++++++
 atr/models/schema.py   |  12 ++
 atr/post/__init__.py   |   2 +
 atr/post/test.py       |  69 +++++++
 atr/shared/__init__.py |   2 +
 atr/shared/test.py     |  76 ++++++++
 8 files changed, 798 insertions(+)

diff --git a/atr/blueprints/post.py b/atr/blueprints/post.py
index 144bcdf..c539c70 100644
--- a/atr/blueprints/post.py
+++ b/atr/blueprints/post.py
@@ -23,8 +23,10 @@ from typing import Any
 import asfquart.auth as auth
 import asfquart.base as base
 import asfquart.session
+import pydantic
 import quart
 
+import atr.form
 import atr.log as log
 import atr.web as web
 
@@ -84,6 +86,67 @@ def committer(path: str) -> 
Callable[[web.CommitterRouteFunction[Any]], web.Rout
     return decorator
 
 
+def empty() -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., 
Awaitable[Any]]]:
+    # This means that instead of:
+    #
+    # @post.form(form.Empty)
+    # async def test_empty(
+    #     session: web.Committer | None,
+    #     form: form.Empty,
+    # ) -> web.WerkzeugResponse:
+    #     pass
+    #
+    # We can use:
+    #
+    # @post.empty()
+    # async def test_empty(
+    #     session: web.Committer | None,
+    # ) -> web.WerkzeugResponse:
+    #     pass
+    def decorator(func: Callable[..., Awaitable[Any]]) -> Callable[..., 
Awaitable[Any]]:
+        async def wrapper(session: web.Committer | None, *args: Any, **kwargs: 
Any) -> Any:
+            try:
+                form_data = await atr.form.quart_request()
+                context = {"session": session}
+                atr.form.validate(atr.form.Empty, form_data, context)
+                return await func(session, *args, **kwargs)
+            except pydantic.ValidationError as e:
+                # This presumably should not happen
+                log.warning(f"Empty form validation error (CSRF): {e}")
+                await quart.flash("Invalid form submission. Please try 
again.", "error")
+                return quart.redirect(quart.request.path)
+
+        wrapper.__name__ = func.__name__
+        wrapper.__doc__ = func.__doc__
+        wrapper.__annotations__ = func.__annotations__.copy()
+        return wrapper
+
+    return decorator
+
+
+def form(
+    form_cls: Any,
+) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
+    def decorator(func: Callable[..., Awaitable[Any]]) -> Callable[..., 
Awaitable[Any]]:
+        async def wrapper(session: web.Committer | None, *args: Any, **kwargs: 
Any) -> Any:
+            try:
+                form_data = await atr.form.quart_request()
+                context = {"session": session}
+                validated_form = atr.form.validate(form_cls, form_data, 
context)
+                return await func(session, validated_form, *args, **kwargs)
+            except pydantic.ValidationError as e:
+                log.warning(f"Form validation error: {e}")
+                await quart.flash(f"Validation error: {e}", "error")
+                return quart.redirect(quart.request.path)
+
+        wrapper.__name__ = func.__name__
+        wrapper.__doc__ = func.__doc__
+        wrapper.__annotations__ = func.__annotations__.copy()
+        return wrapper
+
+    return decorator
+
+
 def public(path: str) -> Callable[[Callable[..., Awaitable[Any]]], 
web.RouteFunction[Any]]:
     def decorator(func: Callable[..., Awaitable[Any]]) -> 
web.RouteFunction[Any]:
         async def wrapper(*args: Any, **kwargs: Any) -> Any:
diff --git a/atr/form.py b/atr/form.py
new file mode 100644
index 0000000..d6754d3
--- /dev/null
+++ b/atr/form.py
@@ -0,0 +1,513 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import enum
+import types
+from typing import TYPE_CHECKING, Annotated, Any, Final, Literal, get_args, 
get_origin
+
+import htpy
+import pydantic
+import pydantic.functional_validators as functional_validators
+import quart
+import quart.datastructures as datastructures
+import quart_wtf.utils as utils
+
+import atr.htm as htm
+import atr.models.schema as schema
+
+if TYPE_CHECKING:
+    from collections.abc import Iterator
+
+    import markupsafe
+
+    import atr.web as web
+
+DISCRIMINATOR_NAME: Final[str] = "variant"
+DISCRIMINATOR: Final[Any] = schema.discriminator(DISCRIMINATOR_NAME)
+
+
+class Form(schema.Form):
+    pass
+
+
+class Empty(Form):
+    pass
+
+
+class Widget(enum.Enum):
+    CHECKBOX = "checkbox"
+    CHECKBOXES = "checkboxes"
+    EMAIL = "email"
+    FILE = "file"
+    FILES = "files"
+    NUMBER = "number"
+    RADIO = "radio"
+    SELECT = "select"
+    TEXT = "text"
+    TEXTAREA = "textarea"
+    URL = "url"
+
+
+def label(description: str, *, default: Any = ..., widget: Widget | None = 
None) -> Any:
+    extra: dict[str, Any] = {"widget": widget.value} if widget else {}
+    return pydantic.Field(default, description=description, 
json_schema_extra=extra)
+
+
+def session(info: pydantic.ValidationInfo) -> web.Committer | None:
+    ctx: dict[str, Any] = info.context or {}
+    return ctx.get("session")
+
+
+async def quart_request() -> dict[str, Any]:
+    form_data = await quart.request.form
+    files_data = await quart.request.files
+
+    combined_data = {}
+    for key in form_data.keys():
+        # This is a compromise
+        # Some things expect single values, and some expect lists
+        values = form_data.getlist(key)
+        if len(values) == 1:
+            combined_data[key] = values[0]
+        else:
+            combined_data[key] = values
+
+    files_by_name: dict[str, list[datastructures.FileStorage]] = {}
+    for key in files_data.keys():
+        file_list = files_data.getlist(key)
+        # When no files are uploaded, the browser may supply a file with an 
empty filename
+        # We filter that out here
+        non_empty_files = [f for f in file_list if f.filename]
+        if non_empty_files:
+            files_by_name[key] = non_empty_files
+
+    for key, file_list in files_by_name.items():
+        if key in combined_data:
+            raise ValueError(f"Files key {key} already exists in form data")
+        combined_data[key] = file_list
+
+    return combined_data
+
+
+async def render_columns(
+    model_cls: type[Form],
+    action: str | None = None,
+    form_classes: str = ".atr-canary",
+    submit_classes: str = "btn-primary",
+    submit_label: str = "Submit",
+    defaults: dict[str, Any] | None = None,
+    errors: dict[str, list[str]] | None = None,
+) -> htm.Element:
+    if action is None:
+        action = quart.request.path
+
+    label_classes = "col-sm-3 col-form-label text-sm-end"
+
+    field_rows: list[htm.Element] = []
+    hidden_fields: list[htm.Element | htm.VoidElement | markupsafe.Markup] = []
+
+    csrf_token = utils.generate_csrf()
+    hidden_fields.append(htpy.input(type="hidden", name="csrf_token", 
value=csrf_token))
+
+    for field_name, field_info in model_cls.model_fields.items():
+        if field_name == "csrf_token":
+            continue
+
+        if defaults:
+            field_value = defaults.get(field_name)
+        elif not field_info.is_required():
+            # Use the Pydantic default if no user default are provided
+            field_value = field_info.get_default(call_default_factory=True)
+        else:
+            field_value = None
+        field_errors = errors.get(field_name) if errors else None
+
+        if (field_name == DISCRIMINATOR_NAME) and (field_info.default is not 
None):
+            default_value = field_info.default
+            hidden_fields.append(htpy.input(type="hidden", 
name=DISCRIMINATOR_NAME, value=default_value))
+            continue
+
+        label_text = field_info.description or field_name.replace("_", " 
").title()
+        is_required = field_info.is_required()
+
+        label_elem = htpy.label(for_=field_name, 
class_=label_classes)[label_text]
+
+        widget_elem = _render_widget(
+            field_name=field_name,
+            field_info=field_info,
+            field_value=field_value,
+            field_errors=field_errors,
+            is_required=is_required,
+        )
+
+        row_div = htm.div(".mb-3.row")
+        widget_div = htm.div(".col-sm-8")
+        field_rows.append(row_div[label_elem, widget_div[widget_elem]])
+
+    form_children: list[htm.Element | htm.VoidElement | markupsafe.Markup] = 
hidden_fields + field_rows
+
+    submit_button = htpy.button(type="submit", class_=f"btn 
{submit_classes}")[submit_label]
+    submit_div = htm.div(".col-sm-9.offset-sm-3")
+    submit_row = htm.div(".row")[submit_div[submit_button]]
+    form_children.append(submit_row)
+
+    return htm.form(form_classes, action=action, method="post", 
enctype="multipart/form-data")[form_children]
+
+
+def to_bool(v: Any) -> bool:
+    if isinstance(v, bool):
+        return v
+    if v == "on":
+        return True
+    raise ValueError(f"Cannot convert {v!r} to boolean")
+
+
+def to_enum_set[EnumType: enum.Enum](v: Any, enum_class: type[EnumType]) -> 
set[EnumType]:
+    members: dict[str, EnumType] = {member.value: member for member in 
enum_class}
+    if isinstance(v, set):
+        return {item for item in v if isinstance(item, enum_class)}
+    if isinstance(v, list):
+        return {members[item] for item in v if item in members}
+    if isinstance(v, str):
+        if v in members:
+            return {members[v]}
+        raise ValueError(f"Invalid enum value: {v!r}")
+    raise ValueError(f"Expected a set of enum values, got {type(v).__name__}")
+
+
+def to_filestorage(v: Any) -> datastructures.FileStorage:
+    if not isinstance(v, datastructures.FileStorage):
+        raise ValueError("Expected an uploaded file")
+    return v
+
+
+def to_filestorage_list(v: Any) -> list[datastructures.FileStorage]:
+    if isinstance(v, list):
+        result = []
+        for item in v:
+            if not isinstance(item, datastructures.FileStorage):
+                raise ValueError("Expected a list of uploaded files")
+            result.append(item)
+        return result
+    if isinstance(v, datastructures.FileStorage):
+        return [v]
+    raise ValueError("Expected a list of uploaded files")
+
+
+def to_int(v: Any) -> int:
+    # if v == "":
+    #     return 0
+    try:
+        return int(v)
+    except ValueError:
+        raise ValueError(f"Invalid integer value: {v!r}")
+
+
+# Validator types come before other functions
+# We must not use the "type" keyword here, otherwise Pydantic complains
+
+Bool = Annotated[
+    bool,
+    functional_validators.BeforeValidator(to_bool),
+    pydantic.Field(default=False),
+]
+
+Email = pydantic.EmailStr
+
+File = Annotated[
+    datastructures.FileStorage,
+    functional_validators.BeforeValidator(to_filestorage),
+]
+
+FileList = Annotated[
+    list[datastructures.FileStorage],
+    functional_validators.BeforeValidator(to_filestorage_list),
+    pydantic.Field(default_factory=list),
+]
+
+Int = Annotated[
+    int,
+    functional_validators.BeforeValidator(to_int),
+]
+
+
+class Set[EnumType: enum.Enum]:
+    def __iter__(self) -> Iterator[EnumType]:
+        # For type checkers
+        raise NotImplementedError
+
+    @staticmethod
+    def __class_getitem__(enum_class: type[EnumType]):
+        def validator(v: Any) -> set[EnumType]:
+            return to_enum_set(v, enum_class)
+
+        return Annotated[
+            set[enum_class],
+            functional_validators.BeforeValidator(validator),
+            pydantic.Field(default_factory=set),
+        ]
+
+
+def validate(model_cls: Any, form: dict[str, Any], context: dict[str, Any] | 
None = None) -> pydantic.BaseModel:
+    # Since pydantic.TypeAdapter accepts Any, we do the same
+    return pydantic.TypeAdapter(model_cls).validate_python(form, 
context=context)
+
+
+def value(type_alias: Any) -> Any:
+    # This is for unwrapping from Literal for discriminators
+    if hasattr(type_alias, "__value__"):
+        type_alias = type_alias.__value__
+    args = get_args(type_alias)
+    if args:
+        return args[0]
+    raise ValueError(f"Cannot extract default value from {type_alias}")
+
+
+def widget(widget_type: Widget) -> Any:
+    return pydantic.Field(..., json_schema_extra={"widget": widget_type.value})
+
+
+def _render_widget(  # noqa: C901
+    field_name: str,
+    field_info: pydantic.fields.FieldInfo,
+    field_value: Any,
+    field_errors: list[str] | None,
+    is_required: bool,
+) -> htm.Element | htm.VoidElement:
+    widget_type = _get_widget_type(field_info)
+    widget_classes = _get_widget_classes(widget_type, field_errors)
+
+    base_attrs: dict[str, str] = {"name": field_name, "id": field_name, 
"class_": widget_classes}
+
+    elements: list[htm.Element | htm.VoidElement] = []
+
+    match widget_type:
+        case Widget.CHECKBOX:
+            attrs: dict[str, str] = {
+                "type": "checkbox",
+                "name": field_name,
+                "id": field_name,
+                "class_": "form-check-input",
+            }
+            if field_value:
+                attrs["checked"] = ""
+            widget = htpy.input(**attrs)
+
+        case Widget.CHECKBOXES:
+            choices = _get_choices(field_info)
+            if isinstance(field_value, set):
+                selected_values = [item.value for item in field_value]
+            else:
+                selected_values = field_value if isinstance(field_value, list) 
else []
+            checkboxes = []
+            for val, label in choices:
+                checkbox_id = f"{field_name}_{val}"
+                checkbox_attrs: dict[str, str] = {
+                    "type": "checkbox",
+                    "name": field_name,
+                    "id": checkbox_id,
+                    "value": val,
+                    "class_": "form-check-input",
+                }
+                if val in selected_values:
+                    checkbox_attrs["checked"] = ""
+                checkbox_input = htpy.input(**checkbox_attrs)
+                checkbox_label = htpy.label(for_=checkbox_id, 
class_="form-check-label")[label]
+                
checkboxes.append(htpy.div(class_="form-check")[checkbox_input, checkbox_label])
+            elements.extend(checkboxes)
+            widget = htm.div[checkboxes]
+
+        case Widget.EMAIL:
+            attrs = {**base_attrs, "type": "email"}
+            if field_value:
+                attrs["value"] = str(field_value)
+            widget = htpy.input(**attrs)
+
+        case Widget.FILE:
+            widget = htpy.input(type="file", **base_attrs)
+
+        case Widget.FILES:
+            attrs = {**base_attrs, "multiple": ""}
+            widget = htpy.input(type="file", **attrs)
+
+        case Widget.NUMBER:
+            attrs = {**base_attrs, "type": "number"}
+            attrs["value"] = "0" if (field_value is None) else str(field_value)
+            widget = htpy.input(**attrs)
+
+        case Widget.RADIO:
+            choices = _get_choices(field_info)
+            radios = []
+            for val, label in choices:
+                radio_id = f"{field_name}_{val}"
+                radio_attrs: dict[str, str] = {
+                    "type": "radio",
+                    "name": field_name,
+                    "id": radio_id,
+                    "value": val,
+                    "class_": "form-check-input",
+                }
+                if is_required:
+                    radio_attrs["required"] = ""
+                if val == field_value:
+                    radio_attrs["checked"] = ""
+                radio_input = htpy.input(**radio_attrs)
+                radio_label = htpy.label(for_=radio_id, 
class_="form-check-label")[label]
+                radios.append(htpy.div(class_="form-check")[radio_input, 
radio_label])
+            elements.extend(radios)
+            widget = htm.div[radios]
+
+        case Widget.SELECT:
+            choices = _get_choices(field_info)
+            options = [
+                htpy.option(
+                    value=val,
+                    selected="" if (val == field_value) else None,
+                )[label]
+                for val, label in choices
+            ]
+            widget = htpy.select(**base_attrs)[options]
+
+        case Widget.TEXT:
+            attrs = {**base_attrs, "type": "text"}
+            if field_value:
+                attrs["value"] = str(field_value)
+            widget = htpy.input(**attrs)
+
+        case Widget.TEXTAREA:
+            widget = htpy.textarea(**base_attrs)[field_value or ""]
+
+        case Widget.URL:
+            attrs = {**base_attrs, "type": "url"}
+            if field_value:
+                attrs["value"] = str(field_value)
+            widget = htpy.input(**attrs)
+
+    if not elements:
+        elements.append(widget)
+
+    if field_errors:
+        error_text = " ".join(field_errors)
+        error_div = htm.div(".invalid-feedback.d-block")[error_text]
+        elements.append(error_div)
+
+    return htm.div[elements] if len(elements) > 1 else elements[0]
+
+
+def _get_choices(field_info: pydantic.fields.FieldInfo) -> list[tuple[str, 
str]]:
+    annotation = field_info.annotation
+    origin = get_origin(annotation)
+
+    if origin is Literal:
+        return [(v, v) for v in get_args(annotation)]
+
+    if origin is set:
+        args = get_args(annotation)
+        if args and hasattr(args[0], "__members__"):
+            enum_class = args[0]
+            return [(member.value, member.value) for member in enum_class]
+
+    if origin is list:
+        args = get_args(annotation)
+        if args and get_origin(args[0]) is Literal:
+            return [(v, v) for v in get_args(args[0])]
+
+    return []
+
+
+def _get_widget_classes(widget_type: Widget, has_errors: list[str] | None) -> 
str:
+    match widget_type:
+        case Widget.SELECT:
+            base_class = "form-select"
+        case Widget.CHECKBOX | Widget.RADIO | Widget.CHECKBOXES:
+            return "form-check-input"
+        case _:
+            base_class = "form-control"
+
+    if has_errors:
+        return f"{base_class} is-invalid"
+    return base_class
+
+
+def _get_widget_type(field_info: pydantic.fields.FieldInfo) -> Widget:  # 
noqa: C901
+    json_schema_extra = field_info.json_schema_extra or {}
+    if isinstance(json_schema_extra, dict) and "widget" in json_schema_extra:
+        widget_value = json_schema_extra["widget"]
+        if isinstance(widget_value, str):
+            try:
+                return Widget(widget_value)
+            except ValueError:
+                pass
+
+    annotation = field_info.annotation
+    origin = get_origin(annotation)
+
+    if (annotation is not None) and hasattr(annotation, "__value__"):
+        annotation = annotation.__value__
+        origin = get_origin(annotation)
+
+    if isinstance(annotation, types.UnionType) or (origin is type(None)):
+        args = get_args(annotation)
+        non_none_types = [arg for arg in args if (arg is not type(None))]
+        if non_none_types:
+            annotation = non_none_types[0]
+            origin = get_origin(annotation)
+
+    if origin is Annotated:
+        args = get_args(annotation)
+        annotation = args[0]
+        origin = get_origin(annotation)
+
+    if annotation is datastructures.FileStorage:
+        return Widget.FILE
+
+    if annotation is bool:
+        return Widget.CHECKBOX
+
+    if annotation is pydantic.EmailStr:
+        return Widget.EMAIL
+
+    if annotation in (int, float):
+        return Widget.NUMBER
+
+    if origin is Literal:
+        return Widget.SELECT
+
+    if origin is set:
+        args = get_args(annotation)
+        if args and hasattr(args[0], "__members__"):
+            return Widget.CHECKBOXES
+
+    if origin is list:
+        args = get_args(annotation)
+        if args:
+            first_arg = args[0]
+            if get_origin(first_arg) is Literal:
+                return Widget.CHECKBOXES
+            if first_arg is datastructures.FileStorage:
+                return Widget.FILES
+            if hasattr(first_arg, "__value__"):
+                inner = first_arg.__value__
+                inner_origin = get_origin(inner)
+                if inner_origin is Annotated:
+                    inner_args = get_args(inner)
+                    if inner_args and (inner_args[0] is 
datastructures.FileStorage):
+                        return Widget.FILES
+
+    return Widget.TEXT
diff --git a/atr/get/test.py b/atr/get/test.py
index e74454a..35cbe2f 100644
--- a/atr/get/test.py
+++ b/atr/get/test.py
@@ -20,10 +20,31 @@ import asfquart.session
 
 import atr.blueprints.get as get
 import atr.config as config
+import atr.form as form
 import atr.get.root as root
+import atr.htm as htm
+import atr.shared as shared
+import atr.template as template
 import atr.web as web
 
 
[email protected]("/test/empty")
+async def test_empty(session: web.Committer | None) -> str:
+    empty_form = await form.render_columns(
+        model_cls=form.Empty,
+        submit_label="Submit empty form",
+        action="/test/empty",
+    )
+
+    forms_html = htm.div[
+        htm.h2["Empty form"],
+        htm.p["This form only validates the CSRF token and contains no other 
fields."],
+        empty_form,
+    ]
+
+    return await template.blank(title="Test empty form", content=forms_html)
+
+
 @get.public("/test/login")
 async def test_login(session: web.Committer | None) -> web.WerkzeugResponse:
     if not config.get().ALLOW_TESTS:
@@ -42,3 +63,43 @@ async def test_login(session: web.Committer | None) -> 
web.WerkzeugResponse:
 
     asfquart.session.write(session_data)
     return await web.redirect(root.index)
+
+
[email protected]("/test/multiple")
+async def test_multiple(session: web.Committer | None) -> str:
+    apple_form = await form.render_columns(
+        model_cls=shared.test.AppleForm,
+        submit_label="Order apples",
+        action="/test/multiple",
+    )
+
+    banana_form = await form.render_columns(
+        model_cls=shared.test.BananaForm,
+        submit_label="Order bananas",
+        action="/test/multiple",
+    )
+
+    forms_html = htm.div[
+        htm.h2["Apple order form"],
+        apple_form,
+        htm.h2["Banana order form"],
+        banana_form,
+    ]
+
+    return await template.blank(title="Test multiple forms", 
content=forms_html)
+
+
[email protected]("/test/single")
+async def test_single(session: web.Committer | None) -> str:
+    single_form = await form.render_columns(
+        model_cls=shared.test.SingleForm,
+        submit_label="Submit",
+        action="/test/single",
+    )
+
+    forms_html = htm.div[
+        htm.h2["Single form"],
+        single_form,
+    ]
+
+    return await template.blank(title="Test single form", content=forms_html)
diff --git a/atr/models/schema.py b/atr/models/schema.py
index 04f8c33..6bcb013 100644
--- a/atr/models/schema.py
+++ b/atr/models/schema.py
@@ -32,6 +32,18 @@ class Strict(pydantic.BaseModel):
     model_config = pydantic.ConfigDict(extra="forbid", strict=True, 
validate_assignment=True)
 
 
+class Form(pydantic.BaseModel):
+    model_config = pydantic.ConfigDict(
+        extra="forbid",
+        strict=False,
+        validate_assignment=True,
+        arbitrary_types_allowed=True,
+        str_strip_whitespace=True,
+    )
+
+    csrf_token: str | None = None
+
+
 def alias(alias_name: str) -> Any:
     """Helper to create a Pydantic FieldInfo object with only an alias."""
     return Field(alias=alias_name)
diff --git a/atr/post/__init__.py b/atr/post/__init__.py
index 576ce08..c014cf3 100644
--- a/atr/post/__init__.py
+++ b/atr/post/__init__.py
@@ -30,6 +30,7 @@ import atr.post.resolve as resolve
 import atr.post.revisions as revisions
 import atr.post.sbom as sbom
 import atr.post.start as start
+import atr.post.test as test
 import atr.post.tokens as tokens
 import atr.post.upload as upload
 import atr.post.user as user
@@ -52,6 +53,7 @@ __all__ = [
     "revisions",
     "sbom",
     "start",
+    "test",
     "tokens",
     "upload",
     "user",
diff --git a/atr/post/test.py b/atr/post/test.py
new file mode 100644
index 0000000..3e3ef1e
--- /dev/null
+++ b/atr/post/test.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import quart
+
+import atr.blueprints.post as post
+import atr.get as get
+import atr.log as log
+import atr.shared as shared
+import atr.web as web
+
+
[email protected]("/test/empty")
[email protected]()
+async def test_empty(session: web.Committer | None) -> web.WerkzeugResponse:
+    msg = "Empty form submitted successfully"
+    log.info(msg)
+    await quart.flash(msg, "success")
+    return await web.redirect(get.test.test_empty)
+
+
[email protected]("/test/multiple")
[email protected](shared.test.MultipleForm)
+async def test_multiple(session: web.Committer | None, form: 
shared.test.MultipleForm) -> web.WerkzeugResponse:
+    match form:
+        case shared.test.AppleForm() as apple:
+            msg = f"Apple order received: variety={apple.variety}, 
quantity={apple.quantity}, organic={apple.organic}"
+            log.info(msg)
+            await quart.flash(msg, "success")
+
+        case shared.test.BananaForm() as banana:
+            msg = f"Banana order received: ripeness={banana.ripeness}, 
bunch_size={banana.bunch_size}"
+            log.info(msg)
+            await quart.flash(msg, "success")
+
+    return await web.redirect(get.test.test_multiple)
+
+
[email protected]("/test/single")
[email protected](shared.test.SingleForm)
+async def test_single(session: web.Committer | None, form: 
shared.test.SingleForm) -> web.WerkzeugResponse:
+    file_names = [f.filename for f in form.files] if form.files else []
+    compatibility_names = [f.value for f in form.compatibility] if 
form.compatibility else []
+    msg = (
+        f"Single form received:"
+        f" name={form.name},"
+        f" email={form.email},"
+        f" message={form.message},"
+        f" files={file_names},"
+        f" compatibility={compatibility_names}"
+    )
+    log.info(msg)
+    await quart.flash(msg, "success")
+
+    return await web.redirect(get.test.test_single)
diff --git a/atr/shared/__init__.py b/atr/shared/__init__.py
index a51a194..1426353 100644
--- a/atr/shared/__init__.py
+++ b/atr/shared/__init__.py
@@ -33,6 +33,7 @@ import atr.shared.keys as keys
 import atr.shared.projects as projects
 import atr.shared.resolve as resolve
 import atr.shared.start as start
+import atr.shared.test as test
 import atr.shared.tokens as tokens
 import atr.shared.upload as upload
 import atr.shared.user as user
@@ -193,6 +194,7 @@ __all__ = [
     "projects",
     "resolve",
     "start",
+    "test",
     "tokens",
     "upload",
     "user",
diff --git a/atr/shared/test.py b/atr/shared/test.py
new file mode 100644
index 0000000..971f477
--- /dev/null
+++ b/atr/shared/test.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import enum
+from typing import Annotated, Literal
+
+import pydantic
+
+import atr.form as form
+
+type APPLE = Literal["apple"]
+type BANANA = Literal["banana"]
+
+
+class Compatibility(enum.Enum):
+    Alpha = "Alpha"
+    Beta = "Beta"
+    Gamma = "Gamma"
+
+
+class AppleForm(form.Form):
+    variant: APPLE = form.value(APPLE)
+    variety: Literal["Granny Smith", "Honeycrisp", "Gala"] = form.label("Apple 
variety")
+    quantity: form.Int = form.label("Number of apples")
+    organic: form.Bool = form.label("Organic?")
+
+
+class BananaForm(form.Form):
+    variant: BANANA = form.value(BANANA)
+    ripeness: Literal["Green", "Yellow", "Brown"] = form.label("Ripeness 
level", widget=form.Widget.RADIO)
+    bunch_size: form.Int = form.label("Number of bananas in bunch")
+
+
+type MultipleForm = Annotated[
+    AppleForm | BananaForm,
+    form.DISCRIMINATOR,
+]
+
+
+class SingleForm(form.Form):
+    name: str = form.label("Full name")
+    email: form.Email = form.label("Email address")
+    message: str = form.label("Message")
+    files: form.FileList = form.label("Files to upload")
+    compatibility: form.Set[Compatibility] = form.label("Compatibility")
+
+    @pydantic.field_validator("email")
+    @classmethod
+    def validate_email(cls, value: str, info: pydantic.ValidationInfo) -> str:
+        if value == "":
+            return value
+
+        session = form.session(info)
+
+        if session is None:
+            return value
+
+        expected_email = f"{session.asf_uid}@apache.org"
+        if value != expected_email:
+            raise ValueError(f"Email must be empty or {expected_email}")
+
+        return value


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to