This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new d3c0efd  Fix the order of interfaces in the routes module
d3c0efd is described below

commit d3c0efd6b40301ec92bdd040f9b09b80b245147a
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Apr 14 20:59:23 2025 +0100

    Fix the order of interfaces in the routes module
---
 atr/routes/__init__.py | 241 ++++++++++++++++++++++++-------------------------
 1 file changed, 120 insertions(+), 121 deletions(-)

diff --git a/atr/routes/__init__.py b/atr/routes/__init__.py
index 872a5ca..398f86b 100644
--- a/atr/routes/__init__.py
+++ b/atr/routes/__init__.py
@@ -46,6 +46,7 @@ if asfquart.APP is ...:
     raise RuntimeError("APP is not set")
 
 P = ParamSpec("P")
+R = TypeVar("R", covariant=True)
 T = TypeVar("T")
 
 # TODO: Should get this from config, checking debug there
@@ -79,14 +80,6 @@ algorithms: Final[dict[int, str]] = {
 }
 
 
-class FlashError(RuntimeError): ...
-
-
-class MicrosecondsFormatter(logging.Formatter):
-    # Answers on a postcard if you know why Python decided to use a comma by 
default
-    default_msec_format = "%s.%03d"
-
-
 class AsyncFileHandler(logging.Handler):
     """A logging handler that writes logs asynchronously using aiofiles."""
 
@@ -158,7 +151,81 @@ class AsyncFileHandler(logging.Handler):
         super().close()
 
 
+# This is the type of functions to which we apply @committer_get
+# In other words, functions which accept CommitterSession as their first arg
+class CommitterRouteHandler(Protocol[R]):
+    """Protocol for @committer_get decorated functions."""
+
+    __name__: str
+    __doc__: str | None
+
+    def __call__(self, session: CommitterSession, *args: Any, **kwargs: Any) 
-> Awaitable[R]: ...
+
+
+class CommitterSession:
+    """Session with extra information about committers."""
+
+    def __init__(self, web_session: session.ClientSession) -> None:
+        self._projects: list[models.Project] | None = None
+        self._session = web_session
+
+    def __getattr__(self, name: str) -> Any:
+        # TODO: Not type safe, should subclass properly if possible
+        # For example, we can access session.no_such_attr and the type 
checkers won't notice
+        return getattr(self._session, name)
+
+    @property
+    def host(self) -> str:
+        request_host = quart.request.host
+        if ":" in request_host:
+            domain, port = request_host.split(":")
+            # Could be an IPv6 address, so need to check whether port is a 
valid integer
+            if port.isdigit():
+                return domain
+        return request_host
+
+    def only_user_releases(self, releases: Sequence[models.Release]) -> 
list[models.Release]:
+        return util.user_releases(self.uid, releases)
+
+    async def redirect(
+        self, route: CommitterRouteHandler[R], success: str | None = None, 
error: str | None = None, **kwargs: Any
+    ) -> response.Response:
+        """Redirect to a route with a success or error message."""
+        if success is not None:
+            await quart.flash(success, "success")
+        elif error is not None:
+            await quart.flash(error, "error")
+        return quart.redirect(util.as_url(route, **kwargs))
+
+    @property
+    async def user_candidate_drafts(self) -> list[models.Release]:
+        return await user.candidate_drafts(self.uid, 
user_projects=self._projects)
+
+    # @property
+    # async def user_committees(self) -> list[models.Committee]:
+    #     return ...
+
+    @property
+    async def user_projects(self) -> list[models.Project]:
+        if self._projects is None:
+            self._projects = await user.projects(self.uid)
+        return self._projects
+
+    @property
+    async def user_releases(self) -> list[models.Release]:
+        return await user.releases(self.uid)
+
+
+class FlashError(RuntimeError): ...
+
+
+class MicrosecondsFormatter(logging.Formatter):
+    # Answers on a postcard if you know why Python decided to use a comma by 
default
+    default_msec_format = "%s.%03d"
+
+
 # Setup a dedicated logger for route performance metrics
+# NOTE: This code block must come after AsyncFileHandler and 
MicrosecondsFormatter
 route_logger = logging.getLogger("route.performance")
 # Use custom formatter that properly includes microseconds
 # TODO: Is this actually UTC?
@@ -170,6 +237,17 @@ route_logger.setLevel(logging.INFO)
 route_logger.propagate = False
 
 
+# This is the type of functions to which we apply @app_route
+# In other words, functions which accept no session
+class RouteHandler(Protocol[R]):
+    """Protocol for @app_route decorated functions."""
+
+    __name__: str
+    __doc__: str | None
+
+    def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[R]: ...
+
+
 def app_route(
     path: str, methods: list[str] | None = None, endpoint: str | None = None, 
measure_performance: bool = True
 ) -> Callable:
@@ -269,6 +347,40 @@ def app_route_performance_measure(route_path: str, 
http_methods: list[str] | Non
     return decorator
 
 
+# This decorator is an adaptor between @committer_get and @app_route functions
+def committer(
+    path: str, methods: list[str] | None = None, measure_performance: bool = 
True
+) -> Callable[[CommitterRouteHandler[R]], RouteHandler[R]]:
+    """Decorator for committer GET routes that provides an enhanced session 
object."""
+
+    def decorator(func: CommitterRouteHandler[R]) -> RouteHandler[R]:
+        async def wrapper(*args: Any, **kwargs: Any) -> R:
+            web_session = await session.read()
+            if web_session is None:
+                _authentication_failed()
+
+            enhanced_session = CommitterSession(web_session)
+            return await func(enhanced_session, *args, **kwargs)
+
+        # Generate a unique endpoint name
+        endpoint = func.__module__ + "_" + func.__name__
+
+        # Set the name before applying decorators
+        wrapper.__name__ = func.__name__
+        wrapper.__doc__ = func.__doc__
+        wrapper.__annotations__["endpoint"] = endpoint
+
+        # Apply decorators in reverse order
+        decorated = auth.require(auth.Requirements.committer)(wrapper)
+        decorated = app_route(
+            path, methods=methods or ["GET"], endpoint=endpoint, 
measure_performance=measure_performance
+        )(decorated)
+
+        return decorated
+
+    return decorator
+
+
 def format_datetime(timestamp: int) -> str:
     """Format a Unix timestamp into a human readable datetime string."""
     return datetime.datetime.fromtimestamp(timestamp, 
tz=datetime.UTC).strftime("%Y-%m-%d %H:%M:%S")
@@ -360,119 +472,6 @@ async def get_form(request: quart.Request) -> 
datastructures.MultiDict:
     return form
 
 
-R = TypeVar("R", covariant=True)
-
-
-# This is the type of functions to which we apply @committer_get
-# In other words, functions which accept CommitterSession as their first arg
-class CommitterRouteHandler(Protocol[R]):
-    """Protocol for @committer_get decorated functions."""
-
-    __name__: str
-    __doc__: str | None
-
-    def __call__(self, session: CommitterSession, *args: Any, **kwargs: Any) 
-> Awaitable[R]: ...
-
-
-class CommitterSession:
-    """Session with extra information about committers."""
-
-    def __init__(self, web_session: session.ClientSession) -> None:
-        self._projects: list[models.Project] | None = None
-        self._session = web_session
-
-    def __getattr__(self, name: str) -> Any:
-        # TODO: Not type safe, should subclass properly if possible
-        # For example, we can access session.no_such_attr and the type 
checkers won't notice
-        return getattr(self._session, name)
-
-    @property
-    def host(self) -> str:
-        request_host = quart.request.host
-        if ":" in request_host:
-            domain, port = request_host.split(":")
-            # Could be an IPv6 address, so need to check whether port is a 
valid integer
-            if port.isdigit():
-                return domain
-        return request_host
-
-    def only_user_releases(self, releases: Sequence[models.Release]) -> 
list[models.Release]:
-        return util.user_releases(self.uid, releases)
-
-    async def redirect(
-        self, route: CommitterRouteHandler[R], success: str | None = None, 
error: str | None = None, **kwargs: Any
-    ) -> response.Response:
-        """Redirect to a route with a success or error message."""
-        if success is not None:
-            await quart.flash(success, "success")
-        elif error is not None:
-            await quart.flash(error, "error")
-        return quart.redirect(util.as_url(route, **kwargs))
-
-    @property
-    async def user_candidate_drafts(self) -> list[models.Release]:
-        return await user.candidate_drafts(self.uid, 
user_projects=self._projects)
-
-    # @property
-    # async def user_committees(self) -> list[models.Committee]:
-    #     return ...
-
-    @property
-    async def user_projects(self) -> list[models.Project]:
-        if self._projects is None:
-            self._projects = await user.projects(self.uid)
-        return self._projects
-
-    @property
-    async def user_releases(self) -> list[models.Release]:
-        return await user.releases(self.uid)
-
-
-# This is the type of functions to which we apply @app_route
-# In other words, functions which accept no session
-class RouteHandler(Protocol[R]):
-    """Protocol for @app_route decorated functions."""
-
-    __name__: str
-    __doc__: str | None
-
-    def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[R]: ...
-
-
-# This decorator is an adaptor between @committer_get and @app_route functions
-def committer(
-    path: str, methods: list[str] | None = None, measure_performance: bool = 
True
-) -> Callable[[CommitterRouteHandler[R]], RouteHandler[R]]:
-    """Decorator for committer GET routes that provides an enhanced session 
object."""
-
-    def decorator(func: CommitterRouteHandler[R]) -> RouteHandler[R]:
-        async def wrapper(*args: Any, **kwargs: Any) -> R:
-            web_session = await session.read()
-            if web_session is None:
-                _authentication_failed()
-
-            enhanced_session = CommitterSession(web_session)
-            return await func(enhanced_session, *args, **kwargs)
-
-        # Generate a unique endpoint name
-        endpoint = func.__module__ + "_" + func.__name__
-
-        # Set the name before applying decorators
-        wrapper.__name__ = func.__name__
-        wrapper.__doc__ = func.__doc__
-        wrapper.__annotations__["endpoint"] = endpoint
-
-        # Apply decorators in reverse order
-        decorated = auth.require(auth.Requirements.committer)(wrapper)
-        decorated = app_route(
-            path, methods=methods or ["GET"], endpoint=endpoint, 
measure_performance=measure_performance
-        )(decorated)
-
-        return decorated
-
-    return decorator
-
-
 def public(
     path: str, methods: list[str] | None = None, measure_performance: bool = 
True
 ) -> Callable[[RouteHandler[R]], RouteHandler[R]]:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to