This is an automated email from the ASF dual-hosted git repository. brondsem pushed a commit to branch db/8461 in repository https://gitbox.apache.org/repos/asf/allura.git
commit 50a05541e9cb7a6a652dee209277e1d5e731afdf Author: Dave Brondsema <dbronds...@slashdotmedia.com> AuthorDate: Wed Sep 7 12:45:03 2022 -0400 [#8461] convert oauth tests to not mock the oauth library, use requests_oauthlib as a helper to build requests instead --- Allura/allura/tests/decorators.py | 19 +- Allura/allura/tests/functional/test_auth.py | 296 ++++++++++++++-------------- AlluraTest/alluratest/controller.py | 18 ++ 3 files changed, 180 insertions(+), 153 deletions(-) diff --git a/Allura/allura/tests/decorators.py b/Allura/allura/tests/decorators.py index 6c561c228..34854ad0f 100644 --- a/Allura/allura/tests/decorators.py +++ b/Allura/allura/tests/decorators.py @@ -209,18 +209,29 @@ def out_audits(*messages, **kwargs): # not a decorator but use it with LogCapture() context manager -def assert_logmsg_and_no_warnings_or_errors(logs, msg): +def assert_logmsg(logs, msg, maxlevel=logging.CRITICAL+1): """ + can also use logs.check() or logs.check_present() :param testfixtures.logcapture.LogCapture logs: LogCapture() instance - :param str msg: Message to look for + :param str msg: Message substring to look for """ found_msg = False for r in logs.records: if msg in r.getMessage(): found_msg = True - if r.levelno > logging.INFO: + if r.levelno > maxlevel: raise AssertionError(f'unexpected log {r.levelname} {r.getMessage()}') - assert found_msg, 'Did not find {} in logs: {}'.format(msg, '\n'.join([r.getMessage() for r in logs.records])) + assert found_msg, \ + 'Did not find "{}" in these logs: {}'.format(msg, '\n'.join([r.getMessage() for r in logs.records])) + + +def assert_logmsg_and_no_warnings_or_errors(logs, msg): + """ + can also use logs.check() or logs.check_present() + :param testfixtures.logcapture.LogCapture logs: LogCapture() instance + :param str msg: Message substring to look for + """ + return assert_logmsg(logs, msg, maxlevel=logging.INFO) def assert_equivalent_urls(url1, url2): diff --git a/Allura/allura/tests/functional/test_auth.py b/Allura/allura/tests/functional/test_auth.py index c6b8fae11..1d15220d5 100644 --- a/Allura/allura/tests/functional/test_auth.py +++ b/Allura/allura/tests/functional/test_auth.py @@ -14,17 +14,22 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations import calendar from base64 import b32encode from datetime import datetime, time, timedelta from time import time as time_time import json + from six.moves.urllib.parse import urlparse, parse_qs from six.moves.urllib.parse import urlencode from bson import ObjectId import re + +from testfixtures import LogCapture + from ming.orm.ormsession import ThreadLocalORMSession, session from tg import config, expose from mock import patch, Mock @@ -40,17 +45,15 @@ from alluratest.tools import ( assert_false, ) from tg import tmpl_context as c, app_globals as g -import oauth2 from allura.tests import TestController from allura.tests import decorators as td -from allura.tests.decorators import audits, out_audits -from alluratest.controller import setup_trove_categories, TestRestApiBase +from allura.tests.decorators import audits, out_audits, assert_logmsg +from alluratest.controller import setup_trove_categories, TestRestApiBase, oauth1_webtest from allura import model as M from allura.lib import plugin from allura.lib import helpers as h from allura.lib.multifactor import TotpService, RecoveryCodeService -import six def unentity(s): @@ -1854,109 +1857,55 @@ class TestOAuth(TestController): M.OAuthAccessToken.for_user(M.User.by_username('test-admin')), []) def test_interactive(self): - with mock.patch('allura.controllers.rest.oauth.Server') as Server, \ - mock.patch('allura.controllers.rest.oauth.Request') as Request: # these are the oauth2 libs - user = M.User.by_username('test-admin') - M.OAuthConsumerToken( - api_key='api_key', - user_id=user._id, - description='ctok_desc', - ) - ThreadLocalORMSession.flush_all() - Request.from_request.return_value = { - 'oauth_consumer_key': 'api_key', - 'oauth_callback': 'http://my.domain.com/callback', - } - r = self.app.post('/rest/oauth/request_token', params={}) - rtok = parse_qs(r.text)['oauth_token'][0] - r = self.app.post('/rest/oauth/authorize', - params={'oauth_token': rtok}) - r = r.forms[0].submit('yes') - assert r.location.startswith('http://my.domain.com/callback') - pin = parse_qs(urlparse(r.location).query)['oauth_verifier'][0] - Request.from_request.return_value = { - 'oauth_consumer_key': 'api_key', - 'oauth_token': rtok, - 'oauth_verifier': pin, - } - r = self.app.get('/rest/oauth/access_token') - atok = parse_qs(r.text) - assert_equal(len(atok['oauth_token']), 1) - assert_equal(len(atok['oauth_token_secret']), 1) - - # now use the tokens & secrets to make a full OAuth request: - oauth_secret = atok['oauth_token_secret'][0] - oauth_token = atok['oauth_token'][0] - consumer = oauth2.Consumer('api_key', oauth_secret) - M.OAuthConsumerToken.consumer = consumer - access_token = oauth2.Token(oauth_token, oauth_secret) - oauth_client = oauth2.Client(consumer, access_token) - # use the oauth2 lib, but intercept the request and then send it to self.app.get - with mock.patch('oauth2.httplib2.Http.request', name='hl2req') as oa2_req: - oauth_client.request('http://localhost/rest/p/test/', 'GET') - oa2url = oa2_req.call_args[0][1] - oa2url = oa2url.replace('http://localhost', '') - # print(oa2url) - oa2kwargs = oa2_req.call_args[1] - self.app.get(oa2url, headers=oa2kwargs['headers'], status=200) - self.app.get(oa2url.replace('oauth_signature=', 'removed='), headers=oa2kwargs['headers'], status=401) - - @mock.patch('allura.controllers.rest.oauth.Server') - @mock.patch('allura.controllers.rest.oauth.Request') - def test_request_token_valid(self, Request, Server): - M.OAuthConsumerToken.consumer = mock.Mock() - user = M.User.by_username('test-user') - consumer_token = M.OAuthConsumerToken( - api_key='api_key', - user_id=user._id, - ) - ThreadLocalORMSession.flush_all() - req = Request.from_request.return_value = {'oauth_consumer_key': 'api_key'} - r = self.app.post('/rest/oauth/request_token', params={'key': 'value'}) - - # dict-ify webob.EnvironHeaders - call = Request.from_request.call_args_list[0] - call[1]['headers'] = dict(call[1]['headers']) - # then check equality - assert_equal(Request.from_request.call_args_list, [ - mock.call('POST', 'http://localhost/rest/oauth/request_token', - headers={'Host': 'localhost:80', - 'Content-Type': 'application/x-www-form-urlencoded', - 'Content-Length': '9'}, - parameters={'key': 'value'}, - query_string='') - ]) - Server().verify_request.assert_called_once_with(req, consumer_token.consumer, None) - request_token = M.OAuthRequestToken.query.get(consumer_token_id=consumer_token._id) - assert_is_not_none(request_token) - assert_equal(r.text, request_token.to_string()) - - @mock.patch('allura.controllers.rest.oauth.Server') - @mock.patch('allura.controllers.rest.oauth.Request') - def test_request_token_no_consumer_token_matching(self, Request, Server): - Request.from_request.return_value = {'oauth_consumer_key': 'api_key'} - self.app.post('/rest/oauth/request_token', - params={'key': 'value'}, status=401) - - @mock.patch('allura.controllers.rest.oauth.Server') - @mock.patch('allura.controllers.rest.oauth.Request') - def test_request_token_no_consumer_token_given(self, Request, Server): - Request.from_request.return_value = {} - self.app.post('/rest/oauth/request_token', params={'key': 'value'}, status=401) - - @mock.patch('allura.controllers.rest.oauth.Server') - @mock.patch('allura.controllers.rest.oauth.Request') - def test_request_token_invalid(self, Request, Server): - Server().verify_request.side_effect = oauth2.Error('test_request_token_invalid') - M.OAuthConsumerToken.consumer = mock.Mock() - user = M.User.by_username('test-user') + user = M.User.by_username('test-admin') M.OAuthConsumerToken( api_key='api_key', + secret_key='dummy-client-secret', user_id=user._id, + description='ctok_desc', ) ThreadLocalORMSession.flush_all() - Request.from_request.return_value = {'oauth_consumer_key': 'api_key'} - self.app.post('/rest/oauth/request_token', params={'key': 'value'}, status=401) + oauth_params = dict( + client_key='api_key', + client_secret='dummy-client-secret', + callback_uri='http://my.domain.com/callback', + ) + r = self.app.post(*oauth1_webtest('/rest/oauth/request_token', oauth_params, method='POST')) + rtok = parse_qs(r.text)['oauth_token'][0] + rsecr = parse_qs(r.text)['oauth_token_secret'][0] + assert rtok + assert rsecr + r = self.app.post('/rest/oauth/authorize', + params={'oauth_token': rtok}) + r = r.forms[0].submit('yes') + assert r.location.startswith('http://my.domain.com/callback') + pin = parse_qs(urlparse(r.location).query)['oauth_verifier'][0] + assert pin + + oauth_params = dict( + client_key='api_key', + client_secret='dummy-client-secret', + resource_owner_key=rtok, + resource_owner_secret=rsecr, + verifier=pin, + ) + r = self.app.get(*oauth1_webtest('/rest/oauth/access_token', oauth_params)) + atok = parse_qs(r.text) + assert_equal(len(atok['oauth_token']), 1) + assert_equal(len(atok['oauth_token_secret']), 1) + + # now use the tokens & secrets to make a full OAuth request: + oauth_token = atok['oauth_token'][0] + oauth_secret = atok['oauth_token_secret'][0] + oaurl, oaparams, oahdrs = oauth1_webtest('/rest/p/test/', dict( + client_key='api_key', + client_secret='dummy-client-secret', + resource_owner_key=oauth_token, + resource_owner_secret=oauth_secret, + signature_type='query' + )) + self.app.get(oaurl, oaparams, oahdrs, status=200) + self.app.get(oaurl.replace('oauth_signature=', 'removed='), oaparams, oahdrs, status=401) def test_authorize_ok(self): user = M.User.by_username('test-admin') @@ -2048,22 +1997,72 @@ class TestOAuth(TestController): r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'}) assert r.location.startswith('http://my.domain.com/callback?myparam=foo&oauth_token=api_key&oauth_verifier=') - @mock.patch('allura.controllers.rest.oauth.Request') - def test_access_token_no_consumer(self, Request): - Request.from_request.return_value = { - 'oauth_consumer_key': 'api_key', - 'oauth_token': 'api_key', - 'oauth_verifier': 'good', - } - self.app.get('/rest/oauth/access_token', status=401) - - @mock.patch('allura.controllers.rest.oauth.Request') - def test_access_token_no_request(self, Request): - Request.from_request.return_value = { - 'oauth_consumer_key': 'api_key', - 'oauth_token': 'api_key', - 'oauth_verifier': 'good', - } + +class TestOAuthRequestToken(TestController): + + oauth_params = dict( + client_key='api_key', + client_secret='dummy-client-secret', + ) + + def test_request_token_valid(self): + user = M.User.by_username('test-user') + consumer_token = M.OAuthConsumerToken( + api_key='api_key', + secret_key='dummy-client-secret', + user_id=user._id, + ) + ThreadLocalORMSession.flush_all() + r = self.app.post(*oauth1_webtest('/rest/oauth/request_token', self.oauth_params, method='POST')) + + request_token = M.OAuthRequestToken.query.get(consumer_token_id=consumer_token._id) + assert_is_not_none(request_token) + assert_equal(r.text, request_token.to_string()) + + def test_request_token_no_consumer_token_matching(self): + with LogCapture() as logs: + self.app.post(*oauth1_webtest('/rest/oauth/request_token', self.oauth_params), status=401) + assert_logmsg(logs, 'Invalid consumer token') + + def test_request_token_no_consumer_token_given(self): + oauth_params = self.oauth_params.copy() + oauth_params['signature_type'] = 'query' # so we can more easily remove a param next + url, params, hdrs = oauth1_webtest('/rest/oauth/request_token', oauth_params) + url = url.replace('oauth_consumer_key', 'gone') + with LogCapture() as logs: + self.app.post(url, params, hdrs, status=401) + assert_logmsg(logs, 'Invalid consumer token') + + def test_request_token_invalid(self): + user = M.User.by_username('test-user') + M.OAuthConsumerToken( + api_key='api_key', + user_id=user._id, + secret_key='dummy-client-secret--INVALID', + ) + ThreadLocalORMSession.flush_all() + with LogCapture() as logs: + self.app.post(*oauth1_webtest('/rest/oauth/request_token', self.oauth_params, method='POST'), + status=401) + assert_logmsg(logs, "Invalid signature <class 'oauth2.Error'> Invalid signature.") + + +class TestOAuthAccessToken(TestController): + + oauth_params = dict( + client_key='api_key', + client_secret='dummy-client-secret', + resource_owner_key='api_key', + resource_owner_secret='dummy-token-secret', + verifier='good', + ) + + def test_access_token_no_consumer(self): + with LogCapture() as logs: + self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params), status=401) + assert_logmsg(logs, 'Invalid consumer token') + + def test_access_token_no_request(self): user = M.User.by_username('test-admin') M.OAuthConsumerToken( api_key='api_key', @@ -2071,15 +2070,11 @@ class TestOAuth(TestController): description='ctok_desc', ) ThreadLocalORMSession.flush_all() - self.app.get('/rest/oauth/access_token', status=401) - - @mock.patch('allura.controllers.rest.oauth.Request') - def test_access_token_bad_pin(self, Request): - Request.from_request.return_value = { - 'oauth_consumer_key': 'api_key', - 'oauth_token': 'api_key', - 'oauth_verifier': 'bad', - } + with LogCapture() as logs: + self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params), status=401) + assert_logmsg(logs, 'Invalid request token') + + def test_access_token_bad_pin(self): user = M.User.by_username('test-admin') ctok = M.OAuthConsumerToken( api_key='api_key', @@ -2094,21 +2089,20 @@ class TestOAuth(TestController): validation_pin='good', ) ThreadLocalORMSession.flush_all() - self.app.get('/rest/oauth/access_token', status=401) - - @mock.patch('allura.controllers.rest.oauth.Server') - @mock.patch('allura.controllers.rest.oauth.Request') - def test_access_token_bad_sig(self, Request, Server): - Request.from_request.return_value = { - 'oauth_consumer_key': 'api_key', - 'oauth_token': 'api_key', - 'oauth_verifier': 'good', - } + with LogCapture() as logs: + oauth_params = self.oauth_params.copy() + oauth_params['verifier'] = 'bad' + self.app.get(*oauth1_webtest('/rest/oauth/access_token', oauth_params), + status=401) + assert_logmsg(logs, 'Invalid verifier') + + def test_access_token_bad_sig(self): user = M.User.by_username('test-admin') ctok = M.OAuthConsumerToken( api_key='api_key', user_id=user._id, description='ctok_desc', + secret_key='dummy-client-secret', ) M.OAuthRequestToken( api_key='api_key', @@ -2116,34 +2110,38 @@ class TestOAuth(TestController): callback='http://my.domain.com/callback?myparam=foo', user_id=user._id, validation_pin='good', + secret_key='dummy-token-secret--INVALID', ) ThreadLocalORMSession.flush_all() - Server().verify_request.side_effect = oauth2.Error('test_access_token_bad_sig') - self.app.get('/rest/oauth/access_token', status=401) - - @mock.patch('allura.controllers.rest.oauth.Server') - @mock.patch('allura.controllers.rest.oauth.Request') - def test_access_token_ok(self, Request, Server): - Request.from_request.return_value = { - 'oauth_consumer_key': 'api_key', - 'oauth_token': 'api_key', - 'oauth_verifier': 'good', - } + with LogCapture() as logs: + self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params), status=401) + assert_logmsg(logs, "Invalid signature <class 'oauth2.Error'> Invalid signature.") + + def test_access_token_ok(self): user = M.User.by_username('test-admin') ctok = M.OAuthConsumerToken( api_key='api_key', + secret_key='dummy-client-secret', user_id=user._id, description='ctok_desc', ) M.OAuthRequestToken( api_key='api_key', + secret_key='dummy-token-secret', consumer_token_id=ctok._id, callback='http://my.domain.com/callback?myparam=foo', user_id=user._id, validation_pin='good', ) ThreadLocalORMSession.flush_all() - r = self.app.get('/rest/oauth/access_token') + + r = self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params)) + atok = parse_qs(r.text) + assert_equal(len(atok['oauth_token']), 1) + assert_equal(len(atok['oauth_token_secret']), 1) + + oauth_params = dict(self.oauth_params, signature_type='query') + r = self.app.get(*oauth1_webtest('/rest/oauth/access_token', oauth_params)) atok = parse_qs(r.text) assert_equal(len(atok['oauth_token']), 1) assert_equal(len(atok['oauth_token_secret']), 1) diff --git a/AlluraTest/alluratest/controller.py b/AlluraTest/alluratest/controller.py index 4c7c01bc4..e1e73082e 100644 --- a/AlluraTest/alluratest/controller.py +++ b/AlluraTest/alluratest/controller.py @@ -16,6 +16,8 @@ # under the License. """Unit and functional test suite for allura.""" +from __future__ import annotations + import os import six.moves.urllib.request import six.moves.urllib.parse @@ -37,6 +39,8 @@ import ew from ming.orm import ThreadLocalORMSession import ming.orm import pkg_resources +import requests +import requests_oauthlib from allura import model as M from allura.command import CreateTroveCategoriesCommand @@ -283,3 +287,17 @@ class TestRestApiBase(TestController): def api_delete(self, path, wrap_args=None, user='test-admin', status=None, **params): return self._api_call('DELETE', path, wrap_args, user, status, **params) + + +def oauth1_webtest(url: str, oauth_kwargs: dict, method='GET') -> tuple[str, dict, dict]: + oauth1 = requests_oauthlib.OAuth1(**oauth_kwargs) + req = requests.Request(method, f'http://localhost{url}').prepare() + oauth1(req) + return request2webtest(req) + + +def request2webtest(req: requests.PreparedRequest) -> tuple[str, dict, dict]: + url = req.url + params = {} + headers = {k: v.decode() for k,v in req.headers.items()} + return url, params, headers