This is an automated email from the ASF dual-hosted git repository. brondsem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/allura.git
commit f13327274c9def46c84d57262fabc461bde8c03f Author: Dave Brondsema <[email protected]> AuthorDate: Tue Oct 7 10:19:33 2025 -0400 manually wrap some long lines --- Allura/allura/controllers/rest.py | 41 +++++++++++++++++++++-------- Allura/allura/tests/functional/test_auth.py | 13 ++++++--- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/Allura/allura/controllers/rest.py b/Allura/allura/controllers/rest.py index 0c45bb16e..267594f9b 100644 --- a/Allura/allura/controllers/rest.py +++ b/Allura/allura/controllers/rest.py @@ -271,14 +271,17 @@ def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwarg client = M.OAuth2ClientApp.query.get(client_id=client_id) return redirect_uri in client.redirect_uris - def validate_response_type(self, client_id: str, response_type: str, client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> bool: + def validate_response_type(self, client_id: str, response_type: str, client: oauthlib.oauth2.Client, + request: oauthlib.common.Request, *args, **kwargs) -> bool: res_type = M.OAuth2ClientApp.query.get(client_id=client_id).response_type return res_type == response_type - def validate_scopes(self, client_id: str, scopes, client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> bool: + def validate_scopes(self, client_id: str, scopes, client: oauthlib.oauth2.Client, + request: oauthlib.common.Request, *args, **kwargs) -> bool: return True - def validate_grant_type(self, client_id: str, grant_type: str, client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> bool: + def validate_grant_type(self, client_id: str, grant_type: str, client: oauthlib.oauth2.Client, + request: oauthlib.common.Request, *args, **kwargs) -> bool: return grant_type in ['authorization_code', 'refresh_token'] def get_default_scopes(self, client_id: str, request: oauthlib.common.Request, *args, **kwargs): @@ -302,14 +305,16 @@ def get_code_challenge_method(self, code: str, request: oauthlib.common.Request) authorization_code = M.OAuth2AuthorizationCode.query.get(authorization_code=code) return authorization_code.code_challenge_method - def invalidate_authorization_code(self, client_id: str, code: str, request: oauthlib.common.Request, *args, **kwargs) -> None: + def invalidate_authorization_code(self, client_id: str, code: str, + request: oauthlib.common.Request, *args, **kwargs) -> None: M.OAuth2AuthorizationCode.query.remove({'client_id': client_id, 'authorization_code': code}) def authenticate_client(self, request: oauthlib.common.Request, *args, **kwargs) -> bool: request.client = M.OAuth2ClientApp.query.get(client_id=request.client_id, client_secret=request.client_secret) return request.client is not None - def validate_code(self, client_id: str, code: str, client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> bool: + def validate_code(self, client_id: str, code: str, client: oauthlib.oauth2.Client, + request: oauthlib.common.Request, *args, **kwargs) -> bool: authorization = M.OAuth2AuthorizationCode.query.get(client_id=client_id, authorization_code=code) return authorization.expires_at >= datetime.utcnow() if authorization else False @@ -321,12 +326,15 @@ def validate_bearer_token(self, token: str, scopes: list[str], request: oauthlib else: return False - def validate_refresh_token(self, refresh_token: str, client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> bool: + def validate_refresh_token(self, refresh_token: str, client: oauthlib.oauth2.Client, + request: oauthlib.common.Request, *args, **kwargs) -> bool: return M.OAuth2AccessToken.query.get(refresh_token=refresh_token, client_id=client.client_id) is not None - def confirm_redirect_uri(self, client_id: str, code: str, redirect_uri: str, client: oauthlib.oauth2.Client, request: oauthlib.common.Request, *args, **kwargs) -> bool: + def confirm_redirect_uri(self, client_id: str, code: str, redirect_uri: str, client: oauthlib.oauth2.Client, + request: oauthlib.common.Request, *args, **kwargs) -> bool: # This method is called when the client is exchanging the authorization code for an access token. - # If a redirect uri was provided when the authorization code was created, it must match the redirect uri provided here. + # If a redirect uri was provided when the authorization code was created, + # it must match the redirect uri provided here. authorization = M.OAuth2AuthorizationCode.query.get(client_id=client_id, authorization_code=code) return authorization.redirect_uri == redirect_uri @@ -352,9 +360,15 @@ def save_authorization_code(self, client_id: str, code, request: oauthlib.common def save_bearer_token(self, token, request: oauthlib.common.Request, *args, **kwargs) -> object: if request.grant_type == 'authorization_code': - user_id = M.OAuth2AuthorizationCode.query.get(client_id=request.client_id, authorization_code=request.code).user_id + user_id = M.OAuth2AuthorizationCode.query.get( + client_id=request.client_id, + authorization_code=request.code, + ).user_id elif request.grant_type == 'refresh_token': - user_id = M.OAuth2AccessToken.query.get(client_id=request.client_id, refresh_token=request.refresh_token).user_id + user_id = M.OAuth2AccessToken.query.get( + client_id=request.client_id, + refresh_token=request.refresh_token, + ).user_id current_token = M.OAuth2AccessToken.query.get(client_id=request.client_id, user_id=user_id, is_bearer=False) @@ -541,7 +555,12 @@ def token(self, **kwargs): except json.decoder.JSONDecodeError: request_body = decoded_body - headers, body, status = self.server.create_token_response(uri=request.url, http_method=request.method, body=request_body, headers=request.headers) + headers, body, status = self.server.create_token_response( + uri=request.url, + http_method=request.method, + body=request_body, + headers=request.headers, + ) response.headers.update(headers) response.status_int = status return body diff --git a/Allura/allura/tests/functional/test_auth.py b/Allura/allura/tests/functional/test_auth.py index 4d6c618b0..5d3e920a0 100644 --- a/Allura/allura/tests/functional/test_auth.py +++ b/Allura/allura/tests/functional/test_auth.py @@ -245,17 +245,20 @@ def test_login_redirect(self): assert r.location == 'http://localhost/' # redir to requested location - r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, params={'return_to': '/p/test/?a=b'}, + r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, + params={'return_to': '/p/test/?a=b'}, status=302) assert r.location == 'http://localhost/p/test/?a=b' # no redirect loop on /auth/ - r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, params={'return_to': '/auth/'}, + r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, + params={'return_to': '/auth/'}, status=302) assert r.location == 'http://localhost/' # no external redirect - r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, params={'return_to': 'http://example.com/x'}, + r = self.app.get('/auth/', extra_environ={'username': 'test-admin'}, + params={'return_to': 'http://example.com/x'}, status=302) assert r.location == 'http://localhost/' @@ -3510,7 +3513,9 @@ def test_navigation_with_invalid_session(self): self.app.set_cookie('beaker.session.id', 'invalid-session-id') # Navigating to a page with an invalid session id should redirect to the login page - r = self.app.get('/auth/preferences/', extra_environ={'username': 'test-user', 'disable_auth_magic': 'True'}, status=302) + r = self.app.get('/auth/preferences/', + extra_environ={'username': 'test-user', 'disable_auth_magic': 'True'}, + status=302) assert '/auth/?return_to=%2Fauth%2Fpreferences%2F' in r.location @mock.patch.dict(config, {'auth.reject_untracked_sessions': True})
