This is an automated email from the ASF dual-hosted git repository.

brondsem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/allura.git

commit f13327274c9def46c84d57262fabc461bde8c03f
Author: Dave Brondsema <[email protected]>
AuthorDate: Tue Oct 7 10:19:33 2025 -0400

    manually wrap some long lines
---
 Allura/allura/controllers/rest.py           | 41 +++++++++++++++++++++--------
 Allura/allura/tests/functional/test_auth.py | 13 ++++++---
 2 files changed, 39 insertions(+), 15 deletions(-)

diff --git a/Allura/allura/controllers/rest.py 
b/Allura/allura/controllers/rest.py
index 0c45bb16e..267594f9b 100644
--- a/Allura/allura/controllers/rest.py
+++ b/Allura/allura/controllers/rest.py
@@ -271,14 +271,17 @@ def validate_redirect_uri(self, client_id, redirect_uri, 
request, *args, **kwarg
         client = M.OAuth2ClientApp.query.get(client_id=client_id)
         return redirect_uri in client.redirect_uris
 
-    def validate_response_type(self, client_id: str, response_type: str, 
client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, 
**kwargs) -> bool:
+    def validate_response_type(self, client_id: str, response_type: str, 
client: oauthlib.oauth2.Client,
+                               request: oauthlib.common.Request, *args, 
**kwargs) -> bool:
         res_type = 
M.OAuth2ClientApp.query.get(client_id=client_id).response_type
         return res_type == response_type
 
-    def validate_scopes(self, client_id: str, scopes, client: 
oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> 
bool:
+    def validate_scopes(self, client_id: str, scopes, client: 
oauthlib.oauth2.Client,
+                        request: oauthlib.common.Request, *args, **kwargs) -> 
bool:
         return True
 
-    def validate_grant_type(self, client_id: str, grant_type: str, client: 
oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> 
bool:
+    def validate_grant_type(self, client_id: str, grant_type: str, client: 
oauthlib.oauth2.Client,
+                            request: oauthlib.common.Request, *args, **kwargs) 
-> bool:
         return grant_type in ['authorization_code', 'refresh_token']
 
     def get_default_scopes(self, client_id: str, request: 
oauthlib.common.Request, *args, **kwargs):
@@ -302,14 +305,16 @@ def get_code_challenge_method(self, code: str, request: 
oauthlib.common.Request)
         authorization_code = 
M.OAuth2AuthorizationCode.query.get(authorization_code=code)
         return authorization_code.code_challenge_method
 
-    def invalidate_authorization_code(self, client_id: str, code: str, 
request: oauthlib.common.Request, *args, **kwargs) -> None:
+    def invalidate_authorization_code(self, client_id: str, code: str,
+                                      request: oauthlib.common.Request, *args, 
**kwargs) -> None:
         M.OAuth2AuthorizationCode.query.remove({'client_id': client_id, 
'authorization_code': code})
 
     def authenticate_client(self, request: oauthlib.common.Request, *args, 
**kwargs) -> bool:
         request.client = 
M.OAuth2ClientApp.query.get(client_id=request.client_id, 
client_secret=request.client_secret)
         return request.client is not None
 
-    def validate_code(self, client_id: str, code: str, client: 
oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> 
bool:
+    def validate_code(self, client_id: str, code: str, client: 
oauthlib.oauth2.Client,
+                      request: oauthlib.common.Request, *args, **kwargs) -> 
bool:
         authorization = 
M.OAuth2AuthorizationCode.query.get(client_id=client_id, 
authorization_code=code)
         return authorization.expires_at >= datetime.utcnow() if authorization 
else False
 
@@ -321,12 +326,15 @@ def validate_bearer_token(self, token: str, scopes: 
list[str], request: oauthlib
         else:
             return False
 
-    def validate_refresh_token(self, refresh_token: str, client: 
oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> 
bool:
+    def validate_refresh_token(self, refresh_token: str, client: 
oauthlib.oauth2.Client,
+                               request: oauthlib.common.Request, *args, 
**kwargs) -> bool:
         return M.OAuth2AccessToken.query.get(refresh_token=refresh_token, 
client_id=client.client_id) is not None
 
-    def confirm_redirect_uri(self, client_id: str, code: str, redirect_uri: 
str, client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, 
**kwargs) -> bool:
+    def confirm_redirect_uri(self, client_id: str, code: str, redirect_uri: 
str, client: oauthlib.oauth2.Client,
+                             request: oauthlib.common.Request, *args, 
**kwargs) -> bool:
         # This method is called when the client is exchanging the 
authorization code for an access token.
-        # If a redirect uri was provided when the authorization code was 
created, it must match the redirect uri provided here.
+        # If a redirect uri was provided when the authorization code was 
created,
+        # it must match the redirect uri provided here.
         authorization = 
M.OAuth2AuthorizationCode.query.get(client_id=client_id, 
authorization_code=code)
         return authorization.redirect_uri == redirect_uri
 
@@ -352,9 +360,15 @@ def save_authorization_code(self, client_id: str, code, 
request: oauthlib.common
 
     def save_bearer_token(self, token, request: oauthlib.common.Request, 
*args, **kwargs) -> object:
         if request.grant_type == 'authorization_code':
-            user_id = 
M.OAuth2AuthorizationCode.query.get(client_id=request.client_id, 
authorization_code=request.code).user_id
+            user_id = M.OAuth2AuthorizationCode.query.get(
+                client_id=request.client_id,
+                authorization_code=request.code,
+            ).user_id
         elif request.grant_type == 'refresh_token':
-            user_id = 
M.OAuth2AccessToken.query.get(client_id=request.client_id, 
refresh_token=request.refresh_token).user_id
+            user_id = M.OAuth2AccessToken.query.get(
+                client_id=request.client_id,
+                refresh_token=request.refresh_token,
+            ).user_id
 
         current_token = 
M.OAuth2AccessToken.query.get(client_id=request.client_id, user_id=user_id, 
is_bearer=False)
 
@@ -541,7 +555,12 @@ def token(self, **kwargs):
         except json.decoder.JSONDecodeError:
             request_body = decoded_body
 
-        headers, body, status = 
self.server.create_token_response(uri=request.url, http_method=request.method, 
body=request_body, headers=request.headers)
+        headers, body, status = self.server.create_token_response(
+            uri=request.url,
+            http_method=request.method,
+            body=request_body,
+            headers=request.headers,
+        )
         response.headers.update(headers)
         response.status_int = status
         return body
diff --git a/Allura/allura/tests/functional/test_auth.py 
b/Allura/allura/tests/functional/test_auth.py
index 4d6c618b0..5d3e920a0 100644
--- a/Allura/allura/tests/functional/test_auth.py
+++ b/Allura/allura/tests/functional/test_auth.py
@@ -245,17 +245,20 @@ def test_login_redirect(self):
         assert r.location == 'http://localhost/'
 
         # redir to requested location
-        r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, 
params={'return_to': '/p/test/?a=b'},
+        r = self.app.get('/auth/', extra_environ={'username': 'test-admin'},
+                         params={'return_to': '/p/test/?a=b'},
                          status=302)
         assert r.location == 'http://localhost/p/test/?a=b'
 
         # no redirect loop on /auth/
-        r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, 
params={'return_to': '/auth/'},
+        r = self.app.get('/auth/', extra_environ={'username': 'test-admin'},
+                         params={'return_to': '/auth/'},
                          status=302)
         assert r.location == 'http://localhost/'
 
         # no external redirect
-        r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, 
params={'return_to': 'http://example.com/x'},
+        r = self.app.get('/auth/', extra_environ={'username': 'test-admin'},
+                         params={'return_to': 'http://example.com/x'},
                          status=302)
         assert r.location == 'http://localhost/'
 
@@ -3510,7 +3513,9 @@ def test_navigation_with_invalid_session(self):
         self.app.set_cookie('beaker.session.id', 'invalid-session-id')
 
         # Navigating to a page with an invalid session id should redirect to 
the login page
-        r = self.app.get('/auth/preferences/', extra_environ={'username': 
'test-user', 'disable_auth_magic': 'True'}, status=302)
+        r = self.app.get('/auth/preferences/',
+                         extra_environ={'username': 'test-user', 
'disable_auth_magic': 'True'},
+                         status=302)
         assert '/auth/?return_to=%2Fauth%2Fpreferences%2F' in r.location
 
     @mock.patch.dict(config, {'auth.reject_untracked_sessions': True})

Reply via email to