This is an automated email from the ASF dual-hosted git repository. dill0wn pushed a commit to branch dw/8455-part2 in repository https://gitbox.apache.org/repos/asf/allura.git
commit 50aa084c873c6f74d5b159f19f5369f12af7d63f Author: Dillon Walls <[email protected]> AuthorDate: Thu Sep 29 14:04:06 2022 +0000 [#8455] remove @with_setup --- Allura/allura/tests/model/test_artifact.py | 493 ++++++------- Allura/allura/tests/model/test_auth.py | 823 ++++++++++----------- Allura/allura/tests/model/test_discussion.py | 966 ++++++++++++------------- Allura/allura/tests/model/test_monq.py | 4 +- Allura/allura/tests/model/test_neighborhood.py | 107 ++- Allura/allura/tests/model/test_oauth.py | 32 +- Allura/allura/tests/model/test_project.py | 325 ++++----- Allura/allura/tests/test_app.py | 295 ++++---- 8 files changed, 1449 insertions(+), 1596 deletions(-) diff --git a/Allura/allura/tests/model/test_artifact.py b/Allura/allura/tests/model/test_artifact.py index 940ae0cac..35402dd0c 100644 --- a/Allura/allura/tests/model/test_artifact.py +++ b/Allura/allura/tests/model/test_artifact.py @@ -22,7 +22,6 @@ import re from datetime import datetime from tg import tmpl_context as c -from alluratest.tools import with_setup from mock import patch import pytest from ming.orm.ormsession import ThreadLocalORMSession @@ -55,273 +54,239 @@ class Checkmessage(M.Message): Mapper.compile_all() -def setup_method(): - setup_basic_test() - setup_unit_test() - setup_with_tools() - - -def teardown_module(): - ThreadLocalORMSession.close_all() - - [email protected]_wiki -def setup_with_tools(): - h.set_context('test', 'wiki', neighborhood='Projects') - Checkmessage.query.remove({}) - WM.Page.query.remove({}) - WM.PageHistory.query.remove({}) - M.Shortlink.query.remove({}) - c.user = M.User.query.get(username='test-admin') - Checkmessage.project = c.project - Checkmessage.app_config = c.app.config - - -@with_setup(setup_method) -def test_artifact(): - pg = WM.Page(title='TestPage1') - assert pg.project == c.project - assert pg.project_id == c.project._id - assert pg.app.config == c.app.config - assert pg.app_config == c.app.config - u = M.User.query.get(username='test-user') - pr = M.ProjectRole.by_user(u, upsert=True) - ThreadLocalORMSession.flush_all() - REGISTRY.register(allura.credentials, allura.lib.security.Credentials()) - assert not security.has_access(pg, 'delete')(user=u) - pg.acl.append(M.ACE.allow(pr._id, 'delete')) - ThreadLocalORMSession.flush_all() - assert security.has_access(pg, 'delete')(user=u) - pg.acl.pop() - ThreadLocalORMSession.flush_all() - assert not security.has_access(pg, 'delete')(user=u) - - -def test_artifact_index(): - pg = WM.Page(title='TestPage1') - idx = pg.index() - assert 'title' in idx - assert 'url_s' in idx - assert 'project_id_s' in idx - assert 'mount_point_s' in idx - assert 'type_s' in idx - assert 'id' in idx - assert idx['id'] == pg.index_id() - assert 'text' in idx - assert 'TestPage' in pg.shorthand_id() - assert pg.link_text() == pg.shorthand_id() - - -@with_setup(setup_method) -def test_artifactlink(): - pg = WM.Page(title='TestPage2') - q_shortlink = M.Shortlink.query.find(dict( - project_id=c.project._id, - app_config_id=c.app.config._id, - link=pg.shorthand_id())) - assert q_shortlink.count() == 0 - - ThreadLocalORMSession.flush_all() - M.MonQTask.run_ready() - ThreadLocalORMSession.flush_all() - assert q_shortlink.count() == 1 - - assert M.Shortlink.lookup('[TestPage2]') - assert M.Shortlink.lookup('[wiki:TestPage2]') - assert M.Shortlink.lookup('[test:wiki:TestPage2]') - assert not M.Shortlink.lookup('[test:wiki:TestPage2:foo]') - assert not M.Shortlink.lookup('[Wiki:TestPage2]') - assert not M.Shortlink.lookup('[TestPage2_no_such_page]') - - pg.delete() - c.project.uninstall_app('wiki') - assert not M.Shortlink.lookup('[wiki:TestPage2]') - assert q_shortlink.count() == 0 - - -@with_setup(setup_method) -def test_gen_messageid(): - assert re.match(r'[0-9a-zA-Z]*[email protected]', - h.gen_message_id()) - - -@with_setup(setup_method) -def test_gen_messageid_with_id_set(): - oid = ObjectId() - assert re.match(r'%[email protected]' % - str(oid), h.gen_message_id(oid)) - - -@with_setup(setup_method) -def test_artifact_messageid(): - p = WM.Page(title='T') - assert re.match(r'%[email protected]' % - str(p._id), p.message_id()) - - -@with_setup(setup_method) -def test_versioning(): - pg = WM.Page(title='TestPage3') - with patch('allura.model.artifact.request', - Request.blank('/', remote_addr='1.1.1.1')): +class TestArtifact: + + def setup_method(self): + setup_basic_test() + setup_unit_test() + self.setup_with_tools() + + def teardown_class(cls): + ThreadLocalORMSession.close_all() + + @td.with_wiki + def setup_with_tools(self): + h.set_context('test', 'wiki', neighborhood='Projects') + Checkmessage.query.remove({}) + WM.Page.query.remove({}) + WM.PageHistory.query.remove({}) + M.Shortlink.query.remove({}) + c.user = M.User.query.get(username='test-admin') + Checkmessage.project = c.project + Checkmessage.app_config = c.app.config + + def test_artifact(self): + pg = WM.Page(title='TestPage1') + assert pg.project == c.project + assert pg.project_id == c.project._id + assert pg.app.config == c.app.config + assert pg.app_config == c.app.config + u = M.User.query.get(username='test-user') + pr = M.ProjectRole.by_user(u, upsert=True) + ThreadLocalORMSession.flush_all() + REGISTRY.register(allura.credentials, allura.lib.security.Credentials()) + assert not security.has_access(pg, 'delete')(user=u) + pg.acl.append(M.ACE.allow(pr._id, 'delete')) + ThreadLocalORMSession.flush_all() + assert security.has_access(pg, 'delete')(user=u) + pg.acl.pop() + ThreadLocalORMSession.flush_all() + assert not security.has_access(pg, 'delete')(user=u) + + def test_artifact_index(self): + pg = WM.Page(title='TestPage1') + idx = pg.index() + assert 'title' in idx + assert 'url_s' in idx + assert 'project_id_s' in idx + assert 'mount_point_s' in idx + assert 'type_s' in idx + assert 'id' in idx + assert idx['id'] == pg.index_id() + assert 'text' in idx + assert 'TestPage' in pg.shorthand_id() + assert pg.link_text() == pg.shorthand_id() + + def test_artifactlink(self): + pg = WM.Page(title='TestPage2') + q_shortlink = M.Shortlink.query.find(dict( + project_id=c.project._id, + app_config_id=c.app.config._id, + link=pg.shorthand_id())) + assert q_shortlink.count() == 0 + + ThreadLocalORMSession.flush_all() + M.MonQTask.run_ready() + ThreadLocalORMSession.flush_all() + assert q_shortlink.count() == 1 + + assert M.Shortlink.lookup('[TestPage2]') + assert M.Shortlink.lookup('[wiki:TestPage2]') + assert M.Shortlink.lookup('[test:wiki:TestPage2]') + assert not M.Shortlink.lookup('[test:wiki:TestPage2:foo]') + assert not M.Shortlink.lookup('[Wiki:TestPage2]') + assert not M.Shortlink.lookup('[TestPage2_no_such_page]') + + pg.delete() + c.project.uninstall_app('wiki') + assert not M.Shortlink.lookup('[wiki:TestPage2]') + assert q_shortlink.count() == 0 + + def test_gen_messageid(self): + assert re.match(r'[0-9a-zA-Z]*[email protected]', + h.gen_message_id()) + + def test_gen_messageid_with_id_set(self): + oid = ObjectId() + assert re.match(r'%[email protected]' % + str(oid), h.gen_message_id(oid)) + + def test_artifact_messageid(self): + p = WM.Page(title='T') + assert re.match(r'%[email protected]' % + str(p._id), p.message_id()) + + def test_versioning(self): + pg = WM.Page(title='TestPage3') + with patch('allura.model.artifact.request', Request.blank('/', remote_addr='1.1.1.1')): + pg.commit() + ThreadLocalORMSession.flush_all() + pg.text = 'Here is some text' pg.commit() - ThreadLocalORMSession.flush_all() - pg.text = 'Here is some text' - pg.commit() - ThreadLocalORMSession.flush_all() - ss = pg.get_version(1) - assert ss.author.logged_ip == '1.1.1.1' - assert ss.index()['is_history_b'] - assert ss.shorthand_id() == pg.shorthand_id() + '#1' - assert ss.title == pg.title - assert ss.text != pg.text - ss = pg.get_version(-1) - assert ss.index()['is_history_b'] - assert ss.shorthand_id() == pg.shorthand_id() + '#2' - assert ss.title == pg.title - assert ss.text == pg.text - pytest.raises(IndexError, pg.get_version, 42) - pg.revert(1) - pg.commit() - ThreadLocalORMSession.flush_all() - assert ss.text != pg.text - assert pg.history().count() == 3 - - -@with_setup(setup_method) -def test_messages_unknown_lookup(): - from bson import ObjectId - m = Checkmessage() - m.author_id = ObjectId() # something new - assert isinstance(m.author(), M.User), type(m.author()) - assert m.author() == M.User.anonymous() - - -@with_setup(setup_method) -@patch('allura.model.artifact.datetime') -def test_last_updated(_datetime): - c.project.last_updated = datetime(2014, 1, 1) - _datetime.utcnow.return_value = datetime(2014, 1, 2) - WM.Page(title='TestPage1') - ThreadLocalORMSession.flush_all() - assert c.project.last_updated == datetime(2014, 1, 2) - - -@with_setup(setup_method) -@patch('allura.model.artifact.datetime') -def test_last_updated_disabled(_datetime): - c.project.last_updated = datetime(2014, 1, 1) - _datetime.utcnow.return_value = datetime(2014, 1, 2) - try: - M.artifact_orm_session._get().skip_last_updated = True + ThreadLocalORMSession.flush_all() + ss = pg.get_version(1) + assert ss.author.logged_ip == '1.1.1.1' + assert ss.index()['is_history_b'] + assert ss.shorthand_id() == pg.shorthand_id() + '#1' + assert ss.title == pg.title + assert ss.text != pg.text + ss = pg.get_version(-1) + assert ss.index()['is_history_b'] + assert ss.shorthand_id() == pg.shorthand_id() + '#2' + assert ss.title == pg.title + assert ss.text == pg.text + pytest.raises(IndexError, pg.get_version, 42) + pg.revert(1) + pg.commit() + ThreadLocalORMSession.flush_all() + assert ss.text != pg.text + assert pg.history().count() == 3 + + def test_messages_unknown_lookup(self): + from bson import ObjectId + m = Checkmessage() + m.author_id = ObjectId() # something new + assert isinstance(m.author(), M.User), type(m.author()) + assert m.author() == M.User.anonymous() + + @patch('allura.model.artifact.datetime') + def test_last_updated(self, _datetime): + c.project.last_updated = datetime(2014, 1, 1) + _datetime.utcnow.return_value = datetime(2014, 1, 2) WM.Page(title='TestPage1') ThreadLocalORMSession.flush_all() - assert c.project.last_updated == datetime(2014, 1, 1) - finally: - M.artifact_orm_session._get().skip_last_updated = False - - -@with_setup(setup_method) -def test_get_discussion_thread_dupe(): - artif = WM.Page(title='TestSomeArtifact') - thr1 = artif.get_discussion_thread()[0] - thr1.post('thr1-post1') - thr1.post('thr1-post2') - thr2 = M.Thread.new(ref_id=thr1.ref_id) - thr2.post('thr2-post1') - thr2.post('thr2-post2') - thr2.post('thr2-post3') - thr3 = M.Thread.new(ref_id=thr1.ref_id) - thr3.post('thr3-post1') - thr4 = M.Thread.new(ref_id=thr1.ref_id) - - thread_q = M.Thread.query.find(dict(ref_id=artif.index_id())) - assert thread_q.count() == 4 - - thread = artif.get_discussion_thread()[0] # force cleanup - threads = thread_q.all() - assert len(threads) == 1 - assert len(thread.posts) == 6 - assert not any(p.deleted for p in thread.posts) # normal thread deletion propagates to children, make sure that doesn't happen - - -def test_snapshot_clear_user_data(): - s = M.Snapshot(author={'username': 'johndoe', - 'display_name': 'John Doe', - 'logged_ip': '1.2.3.4'}) - s.clear_user_data() - assert s.author == {'username': '', + assert c.project.last_updated == datetime(2014, 1, 2) + + @patch('allura.model.artifact.datetime') + def test_last_updated_disabled(self, _datetime): + c.project.last_updated = datetime(2014, 1, 1) + _datetime.utcnow.return_value = datetime(2014, 1, 2) + try: + M.artifact_orm_session._get().skip_last_updated = True + WM.Page(title='TestPage1') + ThreadLocalORMSession.flush_all() + assert c.project.last_updated == datetime(2014, 1, 1) + finally: + M.artifact_orm_session._get().skip_last_updated = False + + def test_get_discussion_thread_dupe(self): + artif = WM.Page(title='TestSomeArtifact') + thr1 = artif.get_discussion_thread()[0] + thr1.post('thr1-post1') + thr1.post('thr1-post2') + thr2 = M.Thread.new(ref_id=thr1.ref_id) + thr2.post('thr2-post1') + thr2.post('thr2-post2') + thr2.post('thr2-post3') + thr3 = M.Thread.new(ref_id=thr1.ref_id) + thr3.post('thr3-post1') + thr4 = M.Thread.new(ref_id=thr1.ref_id) + + thread_q = M.Thread.query.find(dict(ref_id=artif.index_id())) + assert thread_q.count() == 4 + + thread = artif.get_discussion_thread()[0] # force cleanup + threads = thread_q.all() + assert len(threads) == 1 + assert len(thread.posts) == 6 + # normal thread deletion propagates to children, make sure that doesn't happen + assert not any(p.deleted for p in thread.posts) + + def test_snapshot_clear_user_data(self): + s = M.Snapshot(author={'username': 'johndoe', + 'display_name': 'John Doe', + 'logged_ip': '1.2.3.4'}) + s.clear_user_data() + assert s.author == {'username': '', 'display_name': '', 'logged_ip': None, - 'id': None, - } - - -@with_setup(setup_method) -def test_snapshot_from_username(): - s = M.Snapshot(author={'username': 'johndoe', - 'display_name': 'John Doe', - 'logged_ip': '1.2.3.4'}) - s = M.Snapshot(author={'username': 'johnsmith', - 'display_name': 'John Doe', - 'logged_ip': '1.2.3.4'}) - ThreadLocalORMSession.flush_all() - assert len(M.Snapshot.from_username('johndoe')) == 1 - - -def test_feed_clear_user_data(): - f = M.Feed(author_name='John Doe', + 'id': None} + + def test_snapshot_from_username(self): + s = M.Snapshot(author={'username': 'johndoe', + 'display_name': 'John Doe', + 'logged_ip': '1.2.3.4'}) + s = M.Snapshot(author={'username': 'johnsmith', + 'display_name': 'John Doe', + 'logged_ip': '1.2.3.4'}) + ThreadLocalORMSession.flush_all() + assert len(M.Snapshot.from_username('johndoe')) == 1 + + def test_feed_clear_user_data(self): + f = M.Feed(author_name='John Doe', + author_link='/u/johndoe/', + title='Something') + f.clear_user_data() + assert f.author_name == '' + assert f.author_link == '' + assert f.title == 'Something' + + f = M.Feed(author_name='John Doe', + author_link='/u/johndoe/', + title='Home Page modified by John Doe') + f.clear_user_data() + assert f.author_name == '' + assert f.author_link == '' + assert f.title == 'Home Page modified by <REDACTED>' + + def test_feed_from_username(self): + M.Feed(author_name='John Doe', author_link='/u/johndoe/', title='Something') - f.clear_user_data() - assert f.author_name == '' - assert f.author_link == '' - assert f.title == 'Something' - - f = M.Feed(author_name='John Doe', - author_link='/u/johndoe/', - title='Home Page modified by John Doe') - f.clear_user_data() - assert f.author_name == '' - assert f.author_link == '' - assert f.title == 'Home Page modified by <REDACTED>' - - -@with_setup(setup_method) -def test_feed_from_username(): - M.Feed(author_name='John Doe', - author_link='/u/johndoe/', - title='Something') - M.Feed(author_name='John Smith', - author_link='/u/johnsmith/', - title='Something') - ThreadLocalORMSession.flush_all() - assert len(M.Feed.from_username('johndoe')) == 1 - - -@with_setup(setup_method) -def test_subscribed(): - pg = WM.Page(title='TestPage4a') - assert pg.subscribed(include_parents=True) # tool is subscribed to admins by default - assert not pg.subscribed(include_parents=False) - - -@with_setup(setup_method) -def test_subscribed_no_tool_sub(): - pg = WM.Page(title='TestPage4b') - M.Mailbox.unsubscribe(user_id=c.user._id, - project_id=c.project._id, - app_config_id=c.app.config._id) - pg.subscribe() - assert pg.subscribed(include_parents=True) - assert pg.subscribed(include_parents=False) - - -@with_setup(setup_method) -def test_not_subscribed(): - pg = WM.Page(title='TestPage4c') - M.Mailbox.unsubscribe(user_id=c.user._id, - project_id=c.project._id, - app_config_id=c.app.config._id) - assert not pg.subscribed(include_parents=True) - assert not pg.subscribed(include_parents=False) + M.Feed(author_name='John Smith', + author_link='/u/johnsmith/', + title='Something') + ThreadLocalORMSession.flush_all() + assert len(M.Feed.from_username('johndoe')) == 1 + + def test_subscribed(self): + pg = WM.Page(title='TestPage4a') + assert pg.subscribed(include_parents=True) # tool is subscribed to admins by default + assert not pg.subscribed(include_parents=False) + + def test_subscribed_no_tool_sub(self): + pg = WM.Page(title='TestPage4b') + M.Mailbox.unsubscribe(user_id=c.user._id, + project_id=c.project._id, + app_config_id=c.app.config._id) + pg.subscribe() + assert pg.subscribed(include_parents=True) + assert pg.subscribed(include_parents=False) + + def test_not_subscribed(self): + pg = WM.Page(title='TestPage4c') + M.Mailbox.unsubscribe(user_id=c.user._id, + project_id=c.project._id, + app_config_id=c.app.config._id) + assert not pg.subscribed(include_parents=True) + assert not pg.subscribed(include_parents=False) diff --git a/Allura/allura/tests/model/test_auth.py b/Allura/allura/tests/model/test_auth.py index b4e5e80de..7b6364938 100644 --- a/Allura/allura/tests/model/test_auth.py +++ b/Allura/allura/tests/model/test_auth.py @@ -22,15 +22,7 @@ Model tests for auth import textwrap from datetime import datetime, timedelta -from alluratest.tools import ( - with_setup, - assert_equal, - assert_not_equal, - assert_true, - assert_not_in, - assert_in, -) -from tg import tmpl_context as c, app_globals as g, request +from tg import tmpl_context as c, app_globals as g, request as r from webob import Request from mock import patch, Mock @@ -41,437 +33,404 @@ from allura import model as M from allura.lib import helpers as h from allura.lib import plugin from allura.tests import decorators as td -from alluratest.controller import setup_basic_test, setup_global_objects, setup_functional_test +from alluratest.controller import setup_basic_test, setup_global_objects, setup_functional_test, setup_unit_test from alluratest.pytest_helpers import with_nose_compatibility -def setup_method(): - setup_basic_test() - ThreadLocalORMSession.close_all() - setup_global_objects() - - -@with_setup(setup_method) -def test_email_address(): - addr = M.EmailAddress(email='[email protected]', - claimed_by_user_id=c.user._id) - ThreadLocalORMSession.flush_all() - assert addr.claimed_by_user() == c.user - addr2 = M.EmailAddress.create('[email protected]') - addr3 = M.EmailAddress.create('[email protected]') - - # Duplicate emails are allowed, until the email is confirmed - assert addr3 is not addr - - assert addr2 is not addr - assert addr2 - addr4 = M.EmailAddress.create('[email protected]') - assert addr4 is not addr2 - - assert addr is c.user.address_object('[email protected]') - c.user.claim_address('[email protected]') - assert '[email protected]' in c.user.email_addresses - - -@with_setup(setup_method) -def test_email_address_lookup_helpers(): - addr = M.EmailAddress.create('[email protected]') - nobody = M.EmailAddress.create('[email protected]') - ThreadLocalORMSession.flush_all() - assert addr.email == '[email protected]' - - assert M.EmailAddress.get(email='[email protected]') == addr - assert M.EmailAddress.get(email='[email protected]') == addr - assert M.EmailAddress.get(email='[email protected]') == None - assert M.EmailAddress.get(email=None) == None - assert M.EmailAddress.get(email='[email protected]') == nobody - # invalid email returns None, but not [email protected] as before - assert M.EmailAddress.get(email='invalid') == None - - assert M.EmailAddress.find(dict(email='[email protected]')).all() == [addr] - assert M.EmailAddress.find(dict(email='[email protected]')).all() == [addr] - assert M.EmailAddress.find(dict(email='[email protected]')).all() == [] - assert M.EmailAddress.find(dict(email=None)).all() == [] - assert M.EmailAddress.find(dict(email='[email protected]')).all() == [nobody] - # invalid email returns empty query, but not [email protected] as before - assert M.EmailAddress.find(dict(email='invalid')).all() == [] - - -@with_setup(setup_method) -def test_email_address_canonical(): - assert (M.EmailAddress.canonical('[email protected]') == - '[email protected]') - assert (M.EmailAddress.canonical('[email protected]') == - '[email protected]') - assert (M.EmailAddress.canonical('I Am Nobody <[email protected]>') == - '[email protected]') - assert (M.EmailAddress.canonical(' [email protected]\t') == - '[email protected]') - assert (M.EmailAddress.canonical('I Am@Nobody <[email protected]> ') == - '[email protected]') - assert (M.EmailAddress.canonical(' No@body <no@[email protected]> ') == - 'no@[email protected]') - assert (M.EmailAddress.canonical('no@[email protected]') == - 'no@[email protected]') - assert M.EmailAddress.canonical('invalid') == None - -@with_setup(setup_method) -def test_email_address_send_verification_link(): - addr = M.EmailAddress(email='[email protected]', - claimed_by_user_id=c.user._id) - - addr.send_verification_link() - - with patch('allura.tasks.mail_tasks.smtp_client._client') as _client: - M.MonQTask.run_ready() - return_path, rcpts, body = _client.sendmail.call_args[0] - assert rcpts == ['[email protected]'] - - -@with_setup(setup_method) [email protected]_user_project('test-admin') -def test_user(): - assert c.user.url() .endswith('/u/test-admin/') - assert c.user.script_name .endswith('/u/test-admin/') - assert ({p.shortname for p in c.user.my_projects()} == - {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) - # delete one of the projects and make sure it won't appear in my_projects() - p = M.Project.query.get(shortname='test2') - p.deleted = True - ThreadLocalORMSession.flush_all() - assert ({p.shortname for p in c.user.my_projects()} == - {'test', 'u/test-admin', 'adobe-1', '--init--'}) - u = M.User.register(dict( - username='nosetest-user')) - ThreadLocalORMSession.flush_all() - assert u.reg_date - assert u.private_project().shortname == 'u/nosetest-user' - roles = g.credentials.user_roles( - u._id, project_id=u.private_project().root_project._id) - assert len(roles) == 3, roles - u.set_password('foo') - provider = plugin.LocalAuthenticationProvider(Request.blank('/')) - assert provider._validate_password(u, 'foo') - assert not provider._validate_password(u, 'foobar') - u.set_password('foobar') - assert provider._validate_password(u, 'foobar') - assert not provider._validate_password(u, 'foo') - - -@with_setup(setup_method) -def test_user_project_creates_on_demand(): - u = M.User.register(dict(username='foobar123'), make_project=False) - ThreadLocalORMSession.flush_all() - assert not M.Project.query.get(shortname='u/foobar123') - assert u.private_project() - assert M.Project.query.get(shortname='u/foobar123') - - -@with_setup(setup_method) -def test_user_project_already_deleted_creates_on_demand(): - u = M.User.register(dict(username='foobar123'), make_project=True) - p = M.Project.query.get(shortname='u/foobar123') - p.deleted = True - ThreadLocalORMSession.flush_all() - assert not M.Project.query.get(shortname='u/foobar123', deleted=False) - assert u.private_project() - ThreadLocalORMSession.flush_all() - assert M.Project.query.get(shortname='u/foobar123', deleted=False) - - -@with_setup(setup_method) -def test_user_project_does_not_create_on_demand_for_disabled_user(): - u = M.User.register( - dict(username='foobar123', disabled=True), make_project=False) - ThreadLocalORMSession.flush_all() - assert not u.private_project() - assert not M.Project.query.get(shortname='u/foobar123') - - -@with_setup(setup_method) -def test_user_project_does_not_create_on_demand_for_anonymous_user(): - u = M.User.anonymous() - ThreadLocalORMSession.flush_all() - assert not u.private_project() - assert not M.Project.query.get(shortname='u/anonymous') - assert not M.Project.query.get(shortname='u/*anonymous') - - -@with_setup(setup_method) -@patch('allura.model.auth.log') -def test_user_by_email_address(log): - u1 = M.User.register(dict(username='abc1'), make_project=False) - u2 = M.User.register(dict(username='abc2'), make_project=False) - addr1 = M.EmailAddress(email='[email protected]', confirmed=True, - claimed_by_user_id=u1._id) - addr2 = M.EmailAddress(email='[email protected]', confirmed=True, - claimed_by_user_id=u2._id) - # both users are disabled - u1.disabled, u2.disabled = True, True - ThreadLocalORMSession.flush_all() - assert M.User.by_email_address('[email protected]') == None - assert log.warn.call_count == 0 - - # only u2 is active - u1.disabled, u2.disabled = True, False - ThreadLocalORMSession.flush_all() - assert M.User.by_email_address('[email protected]') == u2 - assert log.warn.call_count == 0 - - # both are active - u1.disabled, u2.disabled = False, False - ThreadLocalORMSession.flush_all() - assert M.User.by_email_address('[email protected]') in [u1, u2] - assert log.warn.call_count == 1 - - # invalid email returns None, but not user which claimed - # [email protected] as before - nobody = M.EmailAddress(email='[email protected]', confirmed=True, - claimed_by_user_id=u1._id) - ThreadLocalORMSession.flush_all() - assert M.User.by_email_address('[email protected]') == u1 - assert M.User.by_email_address('invalid') == None - - -def test_user_equality(): - assert M.User.by_username('test-user') == M.User.by_username('test-user') - assert M.User.anonymous() == M.User.anonymous() - assert M.User.by_username('*anonymous') == M.User.anonymous() - - assert M.User.by_username('test-user') != M.User.by_username('test-admin') - assert M.User.by_username('test-user') != M.User.anonymous() - assert M.User.anonymous() != None - assert M.User.anonymous() != 12345 - assert M.User.anonymous() != M.User() - - -def test_user_hash(): - assert M.User.by_username('test-user') in {M.User.by_username('test-user')} - assert M.User.anonymous() in {M.User.anonymous()} - assert M.User.by_username('*anonymous') in {M.User.anonymous()} - - assert M.User.by_username('test-user') not in {M.User.by_username('test-admin')} - assert M.User.anonymous() not in {M.User.by_username('test-admin')} - assert M.User.anonymous() not in {0, None} - - -@with_setup(setup_method) -def test_project_role(): - role = M.ProjectRole(project_id=c.project._id, name='test_role') - M.ProjectRole.by_user(c.user, upsert=True).roles.append(role._id) - ThreadLocalORMSession.flush_all() - roles = g.credentials.user_roles( - c.user._id, project_id=c.project.root_project._id) - roles_ids = [r['_id'] for r in roles] - roles = M.ProjectRole.query.find({'_id': {'$in': roles_ids}}) - for pr in roles: - assert pr.display() - pr.special - assert pr.user in (c.user, None, M.User.anonymous()) - - -@with_setup(setup_method) -def test_default_project_roles(): - roles = { - pr.name: pr - for pr in M.ProjectRole.query.find(dict( - project_id=c.project._id)).all() - if pr.name} - assert 'Admin' in list(roles.keys()), list(roles.keys()) - assert 'Developer' in list(roles.keys()), list(roles.keys()) - assert 'Member' in list(roles.keys()), list(roles.keys()) - assert roles['Developer']._id in roles['Admin'].roles - assert roles['Member']._id in roles['Developer'].roles - - # There're 1 user assigned to project, represented by - # relational (vs named) ProjectRole's - assert len(roles) == M.ProjectRole.query.find(dict( - project_id=c.project._id)).count() - 1 - - -@with_setup(setup_method) -def test_email_address_claimed_by_user(): - addr = M.EmailAddress(email='[email protected]', - claimed_by_user_id=c.user._id) - c.user.disabled = True - ThreadLocalORMSession.flush_all() - assert addr.claimed_by_user() is None - - -@with_setup(setup_method) [email protected]_user_project('test-admin') -def test_user_projects_by_role(): - assert ({p.shortname for p in c.user.my_projects()} == - {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) - assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} == - {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) - # Remove admin access from c.user to test2 project - project = M.Project.query.get(shortname='test2') - admin_role = M.ProjectRole.by_name('Admin', project) - developer_role = M.ProjectRole.by_name('Developer', project) - user_role = M.ProjectRole.by_user(c.user, project=project, upsert=True) - user_role.roles.remove(admin_role._id) - user_role.roles.append(developer_role._id) - ThreadLocalORMSession.flush_all() - g.credentials.clear() - assert ({p.shortname for p in c.user.my_projects()} == - {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) - assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} == - {'test', 'u/test-admin', 'adobe-1', '--init--'}) - - [email protected]_user_project('test-admin') -@with_setup(setup_method) -def test_user_projects_unnamed(): - """ - Confirm that spurious ProjectRoles associating a user with - a project to which they do not belong to any named group - don't cause the user to count as a member of the project. - """ - sub1 = M.Project.query.get(shortname='test/sub1') - M.ProjectRole( - user_id=c.user._id, - project_id=sub1._id) - ThreadLocalORMSession.flush_all() - project_names = [p.shortname for p in c.user.my_projects()] - assert 'test/sub1' not in project_names - assert 'test' in project_names - - [email protected](g, 'user_message_max_messages', 3) -def test_check_sent_user_message_times(): - user1 = M.User.register(dict(username='test-user'), make_project=False) - time1 = datetime.utcnow() - timedelta(minutes=30) - time2 = datetime.utcnow() - timedelta(minutes=45) - time3 = datetime.utcnow() - timedelta(minutes=70) - user1.sent_user_message_times = [time1, time2, time3] - assert user1.can_send_user_message() - assert len(user1.sent_user_message_times) == 2 - user1.sent_user_message_times.append( - datetime.utcnow() - timedelta(minutes=15)) - assert not user1.can_send_user_message() - - -@with_setup(setup_method) [email protected]_user_project('test-admin') -def test_user_track_active(): - # without this session flushing inside track_active raises Exception - setup_functional_test() - c.user = M.User.by_username('test-admin') - - assert c.user.last_access['session_date'] == None - assert c.user.last_access['session_ip'] == None - assert c.user.last_access['session_ua'] == None - - req = Mock(headers={'User-Agent': 'browser'}, remote_addr='addr') - c.user.track_active(req) - c.user = M.User.by_username(c.user.username) - assert c.user.last_access['session_date'] != None - assert c.user.last_access['session_ip'] == 'addr' - assert c.user.last_access['session_ua'] == 'browser' - - # ensure that session activity tracked with a whole-day granularity - prev_date = c.user.last_access['session_date'] - c.user.track_active(req) - c.user = M.User.by_username(c.user.username) - assert c.user.last_access['session_date'] == prev_date - yesterday = datetime.utcnow() - timedelta(1) - c.user.last_access['session_date'] = yesterday - session(c.user).flush(c.user) - c.user.track_active(req) - c.user = M.User.by_username(c.user.username) - assert c.user.last_access['session_date'] > yesterday - - # ...or if IP or User Agent has changed - req.remote_addr = 'new addr' - c.user.track_active(req) - c.user = M.User.by_username(c.user.username) - assert c.user.last_access['session_ip'] == 'new addr' - assert c.user.last_access['session_ua'] == 'browser' - req.headers['User-Agent'] = 'new browser' - c.user.track_active(req) - c.user = M.User.by_username(c.user.username) - assert c.user.last_access['session_ip'] == 'new addr' - assert c.user.last_access['session_ua'] == 'new browser' - - -@with_setup(setup_method) -def test_user_index(): - c.user.email_addresses = ['email1', 'email2'] - c.user.set_pref('email_address', 'email2') - idx = c.user.index() - assert idx['id'] == c.user.index_id() - assert idx['title'] == 'User test-admin' - assert idx['type_s'] == 'User' - assert idx['username_s'] == 'test-admin' - assert idx['email_addresses_t'] == 'email1 email2' - assert idx['email_address_s'] == 'email2' - assert 'last_password_updated_dt' in idx - assert idx['disabled_b'] == False - assert 'results_per_page_i' in idx - assert 'email_format_s' in idx - assert 'disable_user_messages_b' in idx - assert idx['display_name_t'] == 'Test Admin' - assert idx['sex_s'] == 'Unknown' - assert 'birthdate_dt' in idx - assert 'localization_s' in idx - assert 'timezone_s' in idx - assert 'socialnetworks_t' in idx - assert 'telnumbers_t' in idx - assert 'skypeaccount_s' in idx - assert 'webpages_t' in idx - assert 'skills_t' in idx - assert 'last_access_login_date_dt' in idx - assert 'last_access_login_ip_s' in idx - assert 'last_access_login_ua_t' in idx - assert 'last_access_session_date_dt' in idx - assert 'last_access_session_ip_s' in idx - assert 'last_access_session_ua_t' in idx - # provided bby auth provider - assert 'user_registration_date_dt' in idx - - -@with_setup(setup_method) -def test_user_index_none_values(): - c.user.email_addresses = [None] - c.user.set_pref('telnumbers', [None]) - c.user.set_pref('webpages', [None]) - idx = c.user.index() - assert idx['email_addresses_t'] == '' - assert idx['telnumbers_t'] == '' - assert idx['webpages_t'] == '' - - -@with_setup(setup_method) -def test_user_backfill_login_details(): - with h.push_config(request, user_agent='TestBrowser/55'): - # these shouldn't match - h.auditlog_user('something happened') - h.auditlog_user('blah blah Password changed') - with h.push_config(request, user_agent='TestBrowser/56'): - # these should all match, but only one entry created for this ip/ua - h.auditlog_user('Account activated') - h.auditlog_user('Successful login') - h.auditlog_user('Password changed') - with h.push_config(request, user_agent='TestBrowser/57'): - # this should match too - h.auditlog_user('Set up multifactor TOTP') - ThreadLocalORMSession.flush_all() - - auth_provider = plugin.AuthenticationProvider.get(None) - c.user.backfill_login_details(auth_provider) - - details = M.UserLoginDetails.query.find({'user_id': c.user._id}).sort('ua').all() - assert len(details) == 2, details - assert details[0].ip == '127.0.0.1' - assert details[0].ua == 'TestBrowser/56' - assert details[1].ip == '127.0.0.1' - assert details[1].ua == 'TestBrowser/57' +class TestAuth: + + def setup_method(self): + setup_basic_test() + setup_global_objects() + + def test_email_address(self): + addr = M.EmailAddress(email='[email protected]', + claimed_by_user_id=c.user._id) + ThreadLocalORMSession.flush_all() + assert addr.claimed_by_user() == c.user + addr2 = M.EmailAddress.create('[email protected]') + addr3 = M.EmailAddress.create('[email protected]') + ThreadLocalORMSession.flush_all() + + # Duplicate emails are allowed, until the email is confirmed + assert addr3 is not addr + + assert addr2 is not addr + assert addr2 + addr4 = M.EmailAddress.create('[email protected]') + assert addr4 is not addr2 + + assert addr is c.user.address_object('[email protected]') + c.user.claim_address('[email protected]') + assert '[email protected]' in c.user.email_addresses + + def selftest_email_address_lookup_helpers(): + addr = M.EmailAddress.create('[email protected]') + nobody = M.EmailAddress.create('[email protected]') + ThreadLocalORMSession.flush_all() + assert addr.email == '[email protected]' + + assert M.EmailAddress.get(email='[email protected]') == addr + assert M.EmailAddress.get(email='[email protected]') == addr + assert M.EmailAddress.get(email='[email protected]') is None + assert M.EmailAddress.get(email=None) is None + assert M.EmailAddress.get(email='[email protected]') == nobody + # invalid email returns None, but not [email protected] as before + assert M.EmailAddress.get(email='invalid') is None + + assert M.EmailAddress.find(dict(email='[email protected]')).all() == [addr] + assert M.EmailAddress.find(dict(email='[email protected]')).all() == [addr] + assert M.EmailAddress.find(dict(email='[email protected]')).all() == [] + assert M.EmailAddress.find(dict(email=None)).all() == [] + assert M.EmailAddress.find(dict(email='[email protected]')).all() == [nobody] + # invalid email returns empty query, but not [email protected] as before + assert M.EmailAddress.find(dict(email='invalid')).all() == [] + + def test_email_address_canonical(self): + assert M.EmailAddress.canonical('[email protected]') == \ + '[email protected]' + assert M.EmailAddress.canonical('[email protected]') == \ + '[email protected]' + assert M.EmailAddress.canonical('I Am Nobody <[email protected]>') == \ + '[email protected]' + assert M.EmailAddress.canonical(' [email protected]\t') == \ + '[email protected]' + assert M.EmailAddress.canonical('I Am@Nobody <[email protected]> ') == \ + '[email protected]' + assert M.EmailAddress.canonical(' No@body <no@[email protected]> ') == \ + 'no@[email protected]' + assert M.EmailAddress.canonical('no@[email protected]') == \ + 'no@[email protected]' + assert M.EmailAddress.canonical('invalid') is None + + def test_email_address_send_verification_link(self): + addr = M.EmailAddress(email='[email protected]', + claimed_by_user_id=c.user._id) + + addr.send_verification_link() + + with patch('allura.tasks.mail_tasks.smtp_client._client') as _client: + M.MonQTask.run_ready() + return_path, rcpts, body = _client.sendmail.call_args[0] + assert rcpts == ['[email protected]'] + + @td.with_user_project('test-admin') + def test_user(self): + assert c.user.url() .endswith('/u/test-admin/') + assert c.user.script_name .endswith('/u/test-admin/') + assert ({p.shortname for p in c.user.my_projects()} == + {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) + # delete one of the projects and make sure it won't appear in my_projects() + p = M.Project.query.get(shortname='test2') + p.deleted = True + ThreadLocalORMSession.flush_all() + assert ({p.shortname for p in c.user.my_projects()} == + {'test', 'u/test-admin', 'adobe-1', '--init--'}) + u = M.User.register(dict( + username='nosetest-user')) + ThreadLocalORMSession.flush_all() + assert u.reg_date + assert u.private_project().shortname == 'u/nosetest-user' + roles = g.credentials.user_roles( + u._id, project_id=u.private_project().root_project._id) + assert len(roles) == 3, roles + u.set_password('foo') + provider = plugin.LocalAuthenticationProvider(Request.blank('/')) + assert provider._validate_password(u, 'foo') + assert not provider._validate_password(u, 'foobar') + u.set_password('foobar') + assert provider._validate_password(u, 'foobar') + assert not provider._validate_password(u, 'foo') + + def test_user_project_creates_on_demand(self): + u = M.User.register(dict(username='foobar123'), make_project=False) + ThreadLocalORMSession.flush_all() + assert not M.Project.query.get(shortname='u/foobar123') + assert u.private_project() + assert M.Project.query.get(shortname='u/foobar123') + + def test_user_project_already_deleted_creates_on_demand(self): + u = M.User.register(dict(username='foobar123'), make_project=True) + p = M.Project.query.get(shortname='u/foobar123') + p.deleted = True + ThreadLocalORMSession.flush_all() + assert not M.Project.query.get(shortname='u/foobar123', deleted=False) + assert u.private_project() + ThreadLocalORMSession.flush_all() + assert M.Project.query.get(shortname='u/foobar123', deleted=False) + + def test_user_project_does_not_create_on_demand_for_disabled_user(self): + u = M.User.register( + dict(username='foobar123', disabled=True), make_project=False) + ThreadLocalORMSession.flush_all() + assert not u.private_project() + assert not M.Project.query.get(shortname='u/foobar123') + + def test_user_project_does_not_create_on_demand_for_anonymous_user(self): + u = M.User.anonymous() + ThreadLocalORMSession.flush_all() + assert not u.private_project() + assert not M.Project.query.get(shortname='u/anonymous') + assert not M.Project.query.get(shortname='u/*anonymous') + + @patch('allura.model.auth.log') + def test_user_by_email_address(self, log): + u1 = M.User.register(dict(username='abc1'), make_project=False) + u2 = M.User.register(dict(username='abc2'), make_project=False) + addr1 = M.EmailAddress(email='[email protected]', confirmed=True, + claimed_by_user_id=u1._id) + addr2 = M.EmailAddress(email='[email protected]', confirmed=True, + claimed_by_user_id=u2._id) + # both users are disabled + u1.disabled, u2.disabled = True, True + ThreadLocalORMSession.flush_all() + assert M.User.by_email_address('[email protected]') is None + assert log.warn.call_count == 0 + + # only u2 is active + u1.disabled, u2.disabled = True, False + ThreadLocalORMSession.flush_all() + assert M.User.by_email_address('[email protected]') == u2 + assert log.warn.call_count == 0 + + # both are active + u1.disabled, u2.disabled = False, False + ThreadLocalORMSession.flush_all() + assert M.User.by_email_address('[email protected]') in [u1, u2] + assert log.warn.call_count == 1 + + # invalid email returns None, but not user which claimed + # [email protected] as before + nobody = M.EmailAddress(email='[email protected]', confirmed=True, + claimed_by_user_id=u1._id) + ThreadLocalORMSession.flush_all() + assert M.User.by_email_address('[email protected]') == u1 + assert M.User.by_email_address('invalid') is None + + def test_user_equality(self): + assert M.User.by_username('test-user') == M.User.by_username('test-user') + assert M.User.anonymous() == M.User.anonymous() + assert M.User.by_username('*anonymous') == M.User.anonymous() + + assert M.User.by_username('test-user') != M.User.by_username('test-admin') + assert M.User.by_username('test-user') != M.User.anonymous() + assert M.User.anonymous() is not None + assert M.User.anonymous() != 12345 + assert M.User.anonymous() != M.User() + + def test_user_hash(self): + assert M.User.by_username('test-user') in {M.User.by_username('test-user')} + assert M.User.anonymous() in {M.User.anonymous()} + assert M.User.by_username('*anonymous') in {M.User.anonymous()} + + assert M.User.by_username('test-user') not in {M.User.by_username('test-admin')} + assert M.User.anonymous() not in {M.User.by_username('test-admin')} + assert M.User.anonymous() not in {0, None} + + def test_project_role(self): + role = M.ProjectRole(project_id=c.project._id, name='test_role') + M.ProjectRole.by_user(c.user, upsert=True).roles.append(role._id) + ThreadLocalORMSession.flush_all() + roles = g.credentials.user_roles( + c.user._id, project_id=c.project.root_project._id) + roles_ids = [r['_id'] for r in roles] + roles = M.ProjectRole.query.find({'_id': {'$in': roles_ids}}) + for pr in roles: + assert pr.display() + pr.special + assert pr.user in (c.user, None, M.User.anonymous()) + + def test_default_project_roles(self): + roles = { + pr.name: pr + for pr in M.ProjectRole.query.find(dict( + project_id=c.project._id)).all() + if pr.name} + assert 'Admin' in list(roles.keys()), list(roles.keys()) + assert 'Developer' in list(roles.keys()), list(roles.keys()) + assert 'Member' in list(roles.keys()), list(roles.keys()) + assert roles['Developer']._id in roles['Admin'].roles + assert roles['Member']._id in roles['Developer'].roles + + # There're 1 user assigned to project, represented by + # relational (vs named) ProjectRole's + assert len(roles) == M.ProjectRole.query.find(dict( + project_id=c.project._id)).count() - 1 + + def test_email_address_claimed_by_user(self): + addr = M.EmailAddress(email='[email protected]', + claimed_by_user_id=c.user._id) + c.user.disabled = True + ThreadLocalORMSession.flush_all() + assert addr.claimed_by_user() is None + + @td.with_user_project('test-admin') + def test_user_projects_by_role(self): + assert ({p.shortname for p in c.user.my_projects()} == + {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) + assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} == + {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) + # Remove admin access from c.user to test2 project + project = M.Project.query.get(shortname='test2') + admin_role = M.ProjectRole.by_name('Admin', project) + developer_role = M.ProjectRole.by_name('Developer', project) + user_role = M.ProjectRole.by_user(c.user, project=project, upsert=True) + user_role.roles.remove(admin_role._id) + user_role.roles.append(developer_role._id) + ThreadLocalORMSession.flush_all() + g.credentials.clear() + assert ({p.shortname for p in c.user.my_projects()} == + {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'}) + assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} == + {'test', 'u/test-admin', 'adobe-1', '--init--'}) + + @td.with_user_project('test-admin') + def test_user_projects_unnamed(self): + """ + Confirm that spurious ProjectRoles associating a user with + a project to which they do not belong to any named group + don't cause the user to count as a member of the project. + """ + sub1 = M.Project.query.get(shortname='test/sub1') + M.ProjectRole( + user_id=c.user._id, + project_id=sub1._id) + ThreadLocalORMSession.flush_all() + project_names = [p.shortname for p in c.user.my_projects()] + assert 'test/sub1' not in project_names + assert 'test' in project_names + + @patch.object(g, 'user_message_max_messages', 3) + def test_check_sent_user_message_times(self): + user1 = M.User.register(dict(username='test-user'), make_project=False) + time1 = datetime.utcnow() - timedelta(minutes=30) + time2 = datetime.utcnow() - timedelta(minutes=45) + time3 = datetime.utcnow() - timedelta(minutes=70) + user1.sent_user_message_times = [time1, time2, time3] + assert user1.can_send_user_message() + assert len(user1.sent_user_message_times) == 2 + user1.sent_user_message_times.append( + datetime.utcnow() - timedelta(minutes=15)) + assert not user1.can_send_user_message() + + @td.with_user_project('test-admin') + def test_user_track_active(self): + # without this session flushing inside track_active raises Exception + setup_functional_test() + c.user = M.User.by_username('test-admin') + + assert c.user.last_access['session_date'] is None + assert c.user.last_access['session_ip'] is None + assert c.user.last_access['session_ua'] is None + + req = Mock(headers={'User-Agent': 'browser'}, remote_addr='addr') + c.user.track_active(req) + c.user = M.User.by_username(c.user.username) + assert c.user.last_access['session_date'] is not None + assert c.user.last_access['session_ip'] == 'addr' + assert c.user.last_access['session_ua'] == 'browser' + + # ensure that session activity tracked with a whole-day granularity + prev_date = c.user.last_access['session_date'] + c.user.track_active(req) + c.user = M.User.by_username(c.user.username) + assert c.user.last_access['session_date'] == prev_date + yesterday = datetime.utcnow() - timedelta(1) + c.user.last_access['session_date'] = yesterday + session(c.user).flush(c.user) + c.user.track_active(req) + c.user = M.User.by_username(c.user.username) + assert c.user.last_access['session_date'] > yesterday + + # ...or if IP or User Agent has changed + req.remote_addr = 'new addr' + c.user.track_active(req) + c.user = M.User.by_username(c.user.username) + assert c.user.last_access['session_ip'] == 'new addr' + assert c.user.last_access['session_ua'] == 'browser' + req.headers['User-Agent'] = 'new browser' + c.user.track_active(req) + c.user = M.User.by_username(c.user.username) + assert c.user.last_access['session_ip'] == 'new addr' + assert c.user.last_access['session_ua'] == 'new browser' + + def test_user_index(self): + c.user.email_addresses = ['email1', 'email2'] + c.user.set_pref('email_address', 'email2') + idx = c.user.index() + assert idx['id'] == c.user.index_id() + assert idx['title'] == 'User test-admin' + assert idx['type_s'] == 'User' + assert idx['username_s'] == 'test-admin' + assert idx['email_addresses_t'] == 'email1 email2' + assert idx['email_address_s'] == 'email2' + assert 'last_password_updated_dt' in idx + assert idx['disabled_b'] is False + assert 'results_per_page_i' in idx + assert 'email_format_s' in idx + assert 'disable_user_messages_b' in idx + assert idx['display_name_t'] == 'Test Admin' + assert idx['sex_s'] == 'Unknown' + assert 'birthdate_dt' in idx + assert 'localization_s' in idx + assert 'timezone_s' in idx + assert 'socialnetworks_t' in idx + assert 'telnumbers_t' in idx + assert 'skypeaccount_s' in idx + assert 'webpages_t' in idx + assert 'skills_t' in idx + assert 'last_access_login_date_dt' in idx + assert 'last_access_login_ip_s' in idx + assert 'last_access_login_ua_t' in idx + assert 'last_access_session_date_dt' in idx + assert 'last_access_session_ip_s' in idx + assert 'last_access_session_ua_t' in idx + # provided bby auth provider + assert 'user_registration_date_dt' in idx + + def test_user_index_none_values(self): + c.user.email_addresses = [None] + c.user.set_pref('telnumbers', [None]) + c.user.set_pref('webpages', [None]) + idx = c.user.index() + assert idx['email_addresses_t'] == '' + assert idx['telnumbers_t'] == '' + assert idx['webpages_t'] == '' + + def test_user_backfill_login_details(self): + with h.push_config(r, user_agent='TestBrowser/55'): + # these shouldn't match + h.auditlog_user('something happened') + h.auditlog_user('blah blah Password changed') + with h.push_config(r, user_agent='TestBrowser/56'): + # these should all match, but only one entry created for this ip/ua + h.auditlog_user('Account activated') + h.auditlog_user('Successful login') + h.auditlog_user('Password changed') + with h.push_config(r, user_agent='TestBrowser/57'): + # this should match too + h.auditlog_user('Set up multifactor TOTP') + ThreadLocalORMSession.flush_all() + + auth_provider = plugin.AuthenticationProvider.get(None) + c.user.backfill_login_details(auth_provider) + + details = M.UserLoginDetails.query.find({'user_id': c.user._id}).sort('ua').all() + assert len(details) == 2, details + assert details[0].ip == '127.0.0.1' + assert details[0].ua == 'TestBrowser/56' + assert details[1].ip == '127.0.0.1' + assert details[1].ua == 'TestBrowser/57' @with_nose_compatibility class TestAuditLog: + @classmethod + def setup_class(cls): + setup_basic_test() + setup_global_objects() + def test_message_html(self): al = h.auditlog_user('our message <script>alert(1)</script>') assert al.message == textwrap.dedent('''\ diff --git a/Allura/allura/tests/model/test_discussion.py b/Allura/allura/tests/model/test_discussion.py index d5abf1a05..3c158cd3f 100644 --- a/Allura/allura/tests/model/test_discussion.py +++ b/Allura/allura/tests/model/test_discussion.py @@ -24,10 +24,8 @@ from datetime import datetime, timedelta from cgi import FieldStorage from tg import tmpl_context as c -from alluratest.tools import assert_equals, with_setup import mock from mock import patch -from alluratest.tools import assert_equal, assert_in from ming.orm import session, ThreadLocalORMSession from webob import exc @@ -38,513 +36,469 @@ from allura.tests import TestController from alluratest.controller import setup_global_objects -def setup_method(): - controller = TestController() - controller.setup_method(None) - controller.app.get('/wiki/Home/') - setup_global_objects() - ThreadLocalORMSession.close_all() - h.set_context('test', 'wiki', neighborhood='Projects') - ThreadLocalORMSession.flush_all() - ThreadLocalORMSession.close_all() - - -def teardown_module(): - ThreadLocalORMSession.close_all() - - -@with_setup(setup_method) -def test_discussion_methods(): - d = M.Discussion(shortname='test', name='test') - assert d.thread_class() == M.Thread - assert d.post_class() == M.Post - assert d.attachment_class() == M.DiscussionAttachment - ThreadLocalORMSession.flush_all() - d.update_stats() - ThreadLocalORMSession.flush_all() - assert d.last_post is None - assert d.url().endswith('wiki/_discuss/') - assert d.index()['name_s'] == 'test' - assert d.find_posts().count() == 0 - jsn = d.__json__() - assert jsn['name'] == d.name - d.delete() - ThreadLocalORMSession.flush_all() - ThreadLocalORMSession.close_all() - - -@with_setup(setup_method) -def test_thread_methods(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - assert t.discussion_class() == M.Discussion - assert t.post_class() == M.Post - assert t.attachment_class() == M.DiscussionAttachment - p0 = t.post('This is a post') - p1 = t.post('This is another post') - time.sleep(0.25) - t.post('This is a reply', parent_id=p0._id) - ThreadLocalORMSession.flush_all() - ThreadLocalORMSession.close_all() - d = M.Discussion.query.get(shortname='test') - t = d.threads[0] - assert d.last_post is not None - assert t.last_post is not None - t.create_post_threads(t.posts) - posts0 = t.find_posts(page=0, limit=10, style='threaded') - posts1 = t.find_posts(page=0, limit=10, style='timestamp') - assert posts0 != posts1 - ts = p0.timestamp.replace( - microsecond=int(p0.timestamp.microsecond // 1000) * 1000) - posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts) - assert len(posts2) > 0 - - assert 'wiki/_discuss/' in t.url() - assert t.index()['views_i'] == 0 - assert t.post_count == 3 - jsn = t.__json__() - assert '_id' in jsn - assert len(jsn['posts']) == 3 - (p.approve() for p in (p0, p1)) - ThreadLocalORMSession.flush_all() - assert t.num_replies == 3 - t.spam() - assert t.num_replies == 0 - ThreadLocalORMSession.flush_all() - assert len(t.find_posts()) == 0 - t.delete() - - -@with_setup(setup_method) -def test_thread_new(): - with mock.patch('allura.model.discuss.h.nonce') as nonce: - nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead'] +class TestDiscussion: + + def setup_method(self): + controller = TestController() + controller.setup_method(None) + controller.app.get('/wiki/Home/') + setup_global_objects() + ThreadLocalORMSession.close_all() + h.set_context('test', 'wiki', neighborhood='Projects') + ThreadLocalORMSession.flush_all() + ThreadLocalORMSession.close_all() + + @classmethod + def teardown_class(cls): + ThreadLocalORMSession.close_all() + + def test_discussion_methods(self): d = M.Discussion(shortname='test', name='test') - t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One') - t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two') + assert d.thread_class() == M.Thread + assert d.post_class() == M.Post + assert d.attachment_class() == M.DiscussionAttachment ThreadLocalORMSession.flush_all() - session(t1).expunge(t1) - session(t2).expunge(t2) - t1_2 = M.Thread.query.get(_id=t1._id) - t2_2 = M.Thread.query.get(_id=t2._id) - assert t1._id == 'deadbeef' - assert t2._id == 'beefdead' - assert t1_2.subject == 'Test Thread One' - assert t2_2.subject == 'Test Thread Two' - - -@with_setup(setup_method) -def test_post_methods(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p2 = t.post('This is another post') - assert p.discussion_class() == M.Discussion - assert p.thread_class() == M.Thread - assert p.attachment_class() == M.DiscussionAttachment - p.commit() - assert p.parent is None - assert p.subject == 'Test Thread' - assert p.attachments == [] - assert 'wiki/_discuss' in p.url() - assert p.reply_subject() == 'Re: Test Thread' - assert p.link_text() == p.subject - - ss = p.history().first() - assert 'version' in h.get_first(ss.index(), 'title') - assert '#' in ss.shorthand_id() - - jsn = p.__json__() - assert jsn["thread_id"] == t._id - - (p.approve() for p in (p, p2)) - ThreadLocalORMSession.flush_all() - assert t.num_replies == 2 - p.spam() - assert t.num_replies == 1 - p.undo('ok') - assert t.num_replies == 2 - p.delete() - assert t.num_replies == 1 - - -@with_setup(setup_method) -def test_attachment_methods(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p_att = p.attach('foo.text', BytesIO(b'Hello, world!'), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - t_att = p.attach('foo2.text', BytesIO(b'Hello, thread!'), - discussion_id=d._id, - thread_id=t._id) - d_att = p.attach('foo3.text', BytesIO(b'Hello, discussion!'), - discussion_id=d._id) - - ThreadLocalORMSession.flush_all() - assert p_att.post == p - assert p_att.thread == t - assert p_att.discussion == d - for att in (p_att, t_att, d_att): - assert 'wiki/_discuss' in att.url() - assert 'attachment/' in att.url() - - # Test notification in mail - t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') - fs = FieldStorage() - fs.name = 'file_info' - fs.filename = 'fake.txt' - fs.type = 'text/plain' - fs.file = BytesIO(b'this is the content of the fake file\n') - p = t.post(text='test message', forum=None, subject='', file_info=fs) - ThreadLocalORMSession.flush_all() - n = M.Notification.query.get( - subject='[test:wiki] Test comment notification') - url = h.absurl(f'{p.url()}attachment/{fs.filename}') - assert ( - '\nAttachments:\n\n' - '- [fake.txt]({}) (37 Bytes; text/plain)'.format(url) in - n.text) - - -@with_setup(setup_method) -def test_multiple_attachments(): - test_file1 = FieldStorage() - test_file1.name = 'file_info' - test_file1.filename = 'test1.txt' - test_file1.type = 'text/plain' - test_file1.file = BytesIO(b'test file1\n') - test_file2 = FieldStorage() - test_file2.name = 'file_info' - test_file2.filename = 'test2.txt' - test_file2.type = 'text/plain' - test_file2.file = BytesIO(b'test file2\n') - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - test_post = t.post('test post') - test_post.add_multiple_attachments([test_file1, test_file2]) - ThreadLocalORMSession.flush_all() - assert len(test_post.attachments) == 2 - attaches = test_post.attachments - assert 'test1.txt' in [attaches[0].filename, attaches[1].filename] - assert 'test2.txt' in [attaches[0].filename, attaches[1].filename] - - -@with_setup(setup_method) -def test_add_attachment(): - test_file = FieldStorage() - test_file.name = 'file_info' - test_file.filename = 'test.txt' - test_file.type = 'text/plain' - test_file.file = BytesIO(b'test file\n') - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - test_post = t.post('test post') - test_post.add_attachment(test_file) - ThreadLocalORMSession.flush_all() - assert len(test_post.attachments) == 1 - attach = test_post.attachments[0] - assert attach.filename == 'test.txt', attach.filename - assert attach.content_type == 'text/plain', attach.content_type - - -def test_notification_two_attaches(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') - fs1 = FieldStorage() - fs1.name = 'file_info' - fs1.filename = 'fake.txt' - fs1.type = 'text/plain' - fs1.file = BytesIO(b'this is the content of the fake file\n') - fs2 = FieldStorage() - fs2.name = 'file_info' - fs2.filename = 'fake2.txt' - fs2.type = 'text/plain' - fs2.file = BytesIO(b'this is the content of the fake file\n') - p = t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2]) - ThreadLocalORMSession.flush_all() - n = M.Notification.query.get( - subject='[test:wiki] Test comment notification') - base_url = h.absurl(f'{p.url()}attachment/') - assert ( - '\nAttachments:\n\n' - '- [fake.txt]({0}fake.txt) (37 Bytes; text/plain)\n' - '- [fake2.txt]({0}fake2.txt) (37 Bytes; text/plain)'.format(base_url) in - n.text) - - -@with_setup(setup_method) -def test_discussion_delete(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p.attach('foo.text', BytesIO(b''), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - M.ArtifactReference.from_artifact(d) - rid = d.index_id() - ThreadLocalORMSession.flush_all() - d.delete() - ThreadLocalORMSession.flush_all() - assert M.ArtifactReference.query.find(dict(_id=rid)).count() == 0 - - -@with_setup(setup_method) -def test_thread_delete(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p.attach('foo.text', BytesIO(b''), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - ThreadLocalORMSession.flush_all() - t.delete() - - -@with_setup(setup_method) -def test_post_delete(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p.attach('foo.text', BytesIO(b''), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - ThreadLocalORMSession.flush_all() - p.delete() - - -@with_setup(setup_method) -def test_post_undo(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - t.post('This is a post2') - t.post('This is a post3') - ThreadLocalORMSession.flush_all() - assert t.num_replies == 3 - p.spam() - assert t.num_replies == 2 - p.undo('ok') - assert t.num_replies == 3 - - -@with_setup(setup_method) -def test_post_permission_check(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - c.user = M.User.anonymous() - try: - t.post('This post will fail the check.') - assert False, "Expected an anonymous post to fail." - except exc.HTTPUnauthorized: - pass - t.post('This post will pass the check.', ignore_security=True) - - -@with_setup(setup_method) -def test_post_url_paginated(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - p = [] # posts in display order - ts = datetime.utcnow() - timedelta(days=1) - for i in range(5): - ts += timedelta(minutes=1) - p.append(t.post('This is a post #%s' % i, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(1, t.post( - 'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(2, t.post( - 'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(4, t.post( - 'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(6, t.post( - 'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(7, t.post( - 'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(8, t.post( - 'This is reply #0 to reply #1 to post #2', - parent_id=p[7]._id, timestamp=ts)) - - # with default paging limit - for _p in p: - url = t.url() + '?limit=25#' + _p.slug - assert _p.url_paginated() == url, _p.url_paginated() - - # with user paging limit - limit = 3 - c.user.set_pref('results_per_page', limit) - for i, _p in enumerate(p): - page = i // limit - url = t.url() + '?limit=%s' % limit - if page > 0: - url += '&page=%s' % page - url += '#' + _p.slug - assert _p.url_paginated() == url - - -@with_setup(setup_method) -def test_post_url_paginated_with_artifact(): - """Post.url_paginated should return link to attached artifact, if any""" - from forgewiki.model import Page - page = Page.upsert(title='Test Page') - thread = page.discussion_thread - comment = thread.post('Comment') - url = page.url() + '?limit=25#' + comment.slug - assert comment.url_paginated() == url - - -@with_setup(setup_method) -def test_post_notify(): - d = M.Discussion(shortname='test', name='test') - d.monitoring_email = '[email protected]' - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - with patch('allura.model.notification.Notification.send_simple') as send: - t.post('This is a post') - send.assert_called_with(d.monitoring_email) + d.update_stats() + ThreadLocalORMSession.flush_all() + assert d.last_post is None + assert d.url().endswith('wiki/_discuss/') + assert d.index()['name_s'] == 'test' + assert d.find_posts().count() == 0 + jsn = d.__json__() + assert jsn['name'] == d.name + d.delete() + ThreadLocalORMSession.flush_all() + ThreadLocalORMSession.close_all() + + def test_thread_methods(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + assert t.discussion_class() == M.Discussion + assert t.post_class() == M.Post + assert t.attachment_class() == M.DiscussionAttachment + p0 = t.post('This is a post') + p1 = t.post('This is another post') + time.sleep(0.25) + t.post('This is a reply', parent_id=p0._id) + ThreadLocalORMSession.flush_all() + ThreadLocalORMSession.close_all() + d = M.Discussion.query.get(shortname='test') + t = d.threads[0] + assert d.last_post is not None + assert t.last_post is not None + t.create_post_threads(t.posts) + posts0 = t.find_posts(page=0, limit=10, style='threaded') + posts1 = t.find_posts(page=0, limit=10, style='timestamp') + assert posts0 != posts1 + ts = p0.timestamp.replace( + microsecond=int(p0.timestamp.microsecond // 1000) * 1000) + posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts) + assert len(posts2) > 0 + + assert 'wiki/_discuss/' in t.url() + assert t.index()['views_i'] == 0 + assert t.post_count == 3 + jsn = t.__json__() + assert '_id' in jsn + assert len(jsn['posts']) == 3 + (p.approve() for p in (p0, p1)) + ThreadLocalORMSession.flush_all() + assert t.num_replies == 3 + t.spam() + assert t.num_replies == 0 + ThreadLocalORMSession.flush_all() + assert len(t.find_posts()) == 0 + t.delete() + + def test_thread_new(self): + with mock.patch('allura.model.discuss.h.nonce') as nonce: + nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead'] + d = M.Discussion(shortname='test', name='test') + t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One') + t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two') + ThreadLocalORMSession.flush_all() + session(t1).expunge(t1) + session(t2).expunge(t2) + t1_2 = M.Thread.query.get(_id=t1._id) + t2_2 = M.Thread.query.get(_id=t2._id) + assert t1._id == 'deadbeef' + assert t2._id == 'beefdead' + assert t1_2.subject == 'Test Thread One' + assert t2_2.subject == 'Test Thread Two' + + def test_post_methods(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p2 = t.post('This is another post') + assert p.discussion_class() == M.Discussion + assert p.thread_class() == M.Thread + assert p.attachment_class() == M.DiscussionAttachment + p.commit() + assert p.parent is None + assert p.subject == 'Test Thread' + assert p.attachments == [] + assert 'wiki/_discuss' in p.url() + assert p.reply_subject() == 'Re: Test Thread' + assert p.link_text() == p.subject + + ss = p.history().first() + assert 'version' in h.get_first(ss.index(), 'title') + assert '#' in ss.shorthand_id() + + jsn = p.__json__() + assert jsn["thread_id"] == t._id + + (p.approve() for p in (p, p2)) + ThreadLocalORMSession.flush_all() + assert t.num_replies == 2 + p.spam() + assert t.num_replies == 1 + p.undo('ok') + assert t.num_replies == 2 + p.delete() + assert t.num_replies == 1 + + def test_attachment_methods(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p_att = p.attach('foo.text', BytesIO(b'Hello, world!'), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + t_att = p.attach('foo2.text', BytesIO(b'Hello, thread!'), + discussion_id=d._id, + thread_id=t._id) + d_att = p.attach('foo3.text', BytesIO(b'Hello, discussion!'), + discussion_id=d._id) + + ThreadLocalORMSession.flush_all() + assert p_att.post == p + assert p_att.thread == t + assert p_att.discussion == d + for att in (p_att, t_att, d_att): + assert 'wiki/_discuss' in att.url() + assert 'attachment/' in att.url() + + # Test notification in mail + t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') + fs = FieldStorage() + fs.name = 'file_info' + fs.filename = 'fake.txt' + fs.type = 'text/plain' + fs.file = BytesIO(b'this is the content of the fake file\n') + p = t.post(text='test message', forum=None, subject='', file_info=fs) + ThreadLocalORMSession.flush_all() + n = M.Notification.query.get( + subject='[test:wiki] Test comment notification') + url = h.absurl(f'{p.url()}attachment/{fs.filename}') + assert ( + '\nAttachments:\n\n' + '- [fake.txt]({}) (37 Bytes; text/plain)'.format(url) in + n.text) + + def test_multiple_attachments(self): + test_file1 = FieldStorage() + test_file1.name = 'file_info' + test_file1.filename = 'test1.txt' + test_file1.type = 'text/plain' + test_file1.file = BytesIO(b'test file1\n') + test_file2 = FieldStorage() + test_file2.name = 'file_info' + test_file2.filename = 'test2.txt' + test_file2.type = 'text/plain' + test_file2.file = BytesIO(b'test file2\n') + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + test_post = t.post('test post') + test_post.add_multiple_attachments([test_file1, test_file2]) + ThreadLocalORMSession.flush_all() + assert len(test_post.attachments) == 2 + attaches = test_post.attachments + assert 'test1.txt' in [attaches[0].filename, attaches[1].filename] + assert 'test2.txt' in [attaches[0].filename, attaches[1].filename] + + def test_add_attachment(self): + test_file = FieldStorage() + test_file.name = 'file_info' + test_file.filename = 'test.txt' + test_file.type = 'text/plain' + test_file.file = BytesIO(b'test file\n') + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + test_post = t.post('test post') + test_post.add_attachment(test_file) + ThreadLocalORMSession.flush_all() + assert len(test_post.attachments) == 1 + attach = test_post.attachments[0] + assert attach.filename == 'test.txt', attach.filename + assert attach.content_type == 'text/plain', attach.content_type + + def test_notification_two_attaches(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') + fs1 = FieldStorage() + fs1.name = 'file_info' + fs1.filename = 'fake.txt' + fs1.type = 'text/plain' + fs1.file = BytesIO(b'this is the content of the fake file\n') + fs2 = FieldStorage() + fs2.name = 'file_info' + fs2.filename = 'fake2.txt' + fs2.type = 'text/plain' + fs2.file = BytesIO(b'this is the content of the fake file\n') + p = t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2]) + ThreadLocalORMSession.flush_all() + n = M.Notification.query.get( + subject='[test:wiki] Test comment notification') + base_url = h.absurl(f'{p.url()}attachment/') + assert ( + '\nAttachments:\n\n' + '- [fake.txt]({0}fake.txt) (37 Bytes; text/plain)\n' + '- [fake2.txt]({0}fake2.txt) (37 Bytes; text/plain)'.format(base_url) in + n.text) + + def test_discussion_delete(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p.attach('foo.text', BytesIO(b''), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + M.ArtifactReference.from_artifact(d) + rid = d.index_id() + ThreadLocalORMSession.flush_all() + d.delete() + ThreadLocalORMSession.flush_all() + assert M.ArtifactReference.query.find(dict(_id=rid)).count() == 0 + + def test_thread_delete(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p.attach('foo.text', BytesIO(b''), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + ThreadLocalORMSession.flush_all() + t.delete() + + def test_post_delete(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p.attach('foo.text', BytesIO(b''), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + ThreadLocalORMSession.flush_all() + p.delete() + + def test_post_undo(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + t.post('This is a post2') + t.post('This is a post3') + ThreadLocalORMSession.flush_all() + assert t.num_replies == 3 + p.spam() + assert t.num_replies == 2 + p.undo('ok') + assert t.num_replies == 3 - c.app.config.project.notifications_disabled = True - with patch('allura.model.notification.Notification.send_simple') as send: - t.post('Another post') + def test_post_permission_check(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + c.user = M.User.anonymous() try: + t.post('This post will fail the check.') + assert False, "Expected an anonymous post to fail." + except exc.HTTPUnauthorized: + pass + t.post('This post will pass the check.', ignore_security=True) + + def test_post_url_paginated(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + p = [] # posts in display order + ts = datetime.utcnow() - timedelta(days=1) + for i in range(5): + ts += timedelta(minutes=1) + p.append(t.post('This is a post #%s' % i, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(1, t.post( + 'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(2, t.post( + 'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(4, t.post( + 'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(6, t.post( + 'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(7, t.post( + 'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(8, t.post( + 'This is reply #0 to reply #1 to post #2', + parent_id=p[7]._id, timestamp=ts)) + + # with default paging limit + for _p in p: + url = t.url() + '?limit=25#' + _p.slug + assert _p.url_paginated() == url, _p.url_paginated() + + # with user paging limit + limit = 3 + c.user.set_pref('results_per_page', limit) + for i, _p in enumerate(p): + page = i // limit + url = t.url() + '?limit=%s' % limit + if page > 0: + url += '&page=%s' % page + url += '#' + _p.slug + assert _p.url_paginated() == url + + def test_post_url_paginated_with_artifact(self): + """Post.url_paginated should return link to attached artifact, if any""" + from forgewiki.model import Page + page = Page.upsert(title='Test Page') + thread = page.discussion_thread + comment = thread.post('Comment') + url = page.url() + '?limit=25#' + comment.slug + assert comment.url_paginated() == url + + def test_post_notify(self): + d = M.Discussion(shortname='test', name='test') + d.monitoring_email = '[email protected]' + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + with patch('allura.model.notification.Notification.send_simple') as send: + t.post('This is a post') send.assert_called_with(d.monitoring_email) - except AssertionError: - pass # method not called as expected - else: - assert False, 'send_simple must not be called' - - -@with_setup(setup_method) -@patch('allura.model.discuss.c.project.users_with_role') -def test_is_spam_for_admin(users): - users.return_value = [c.user, ] - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - t.post('This is a post') - post = M.Post.query.get(text='This is a post') - assert not t.is_spam(post), t.is_spam(post) - - -@with_setup(setup_method) -@patch('allura.model.discuss.c.project.users_with_role') -def test_is_spam(role): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role.return_value = [] - with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker: + + c.app.config.project.notifications_disabled = True + with patch('allura.model.notification.Notification.send_simple') as send: + t.post('Another post') + try: + send.assert_called_with(d.monitoring_email) + except AssertionError: + pass # method not called as expected + else: + assert False, 'send_simple must not be called' + + @patch('allura.model.discuss.c.project.users_with_role') + def test_is_spam_for_admin(self, users): + users.return_value = [c.user, ] + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + t.post('This is a post') + post = M.Post.query.get(text='This is a post') + assert not t.is_spam(post), t.is_spam(post) + + @patch('allura.model.discuss.c.project.users_with_role') + def test_is_spam(self, role): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role.return_value = [] + with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker: + spam_checker.check.return_value = True + post = mock.Mock() + assert t.is_spam(post), t.is_spam(post) + assert spam_checker.check.call_count == 1, spam_checker.call_count + + @mock.patch('allura.controllers.discuss.g.spam_checker') + def test_not_spam_and_has_unmoderated_post_permission(self, spam_checker): + spam_checker.check.return_value = False + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role = M.ProjectRole.by_name('*anonymous')._id + post_permission = M.ACE.allow(role, 'post') + unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') + t.acl.append(post_permission) + t.acl.append(unmoderated_post_permission) + with h.push_config(c, user=M.User.anonymous()): + post = t.post('Hey') + assert post.status == 'ok' + + @mock.patch('allura.controllers.discuss.g.spam_checker') + @mock.patch.object(M.Thread, 'notify_moderators') + def test_not_spam_but_has_no_unmoderated_post_permission(self, notify_moderators, spam_checker): + spam_checker.check.return_value = False + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role = M.ProjectRole.by_name('*anonymous')._id + post_permission = M.ACE.allow(role, 'post') + t.acl.append(post_permission) + with h.push_config(c, user=M.User.anonymous()): + post = t.post('Hey') + assert post.status == 'pending' + assert notify_moderators.call_count == 1 + + @mock.patch('allura.controllers.discuss.g.spam_checker') + @mock.patch.object(M.Thread, 'notify_moderators') + def test_spam_and_has_unmoderated_post_permission(self, notify_moderators, spam_checker): spam_checker.check.return_value = True - post = mock.Mock() - assert t.is_spam(post), t.is_spam(post) - assert spam_checker.check.call_count == 1, spam_checker.call_count - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') -def test_not_spam_and_has_unmoderated_post_permission(spam_checker): - spam_checker.check.return_value = False - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role = M.ProjectRole.by_name('*anonymous')._id - post_permission = M.ACE.allow(role, 'post') - unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') - t.acl.append(post_permission) - t.acl.append(unmoderated_post_permission) - with h.push_config(c, user=M.User.anonymous()): - post = t.post('Hey') - assert post.status == 'ok' - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') [email protected](M.Thread, 'notify_moderators') -def test_not_spam_but_has_no_unmoderated_post_permission(notify_moderators, spam_checker): - spam_checker.check.return_value = False - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role = M.ProjectRole.by_name('*anonymous')._id - post_permission = M.ACE.allow(role, 'post') - t.acl.append(post_permission) - with h.push_config(c, user=M.User.anonymous()): - post = t.post('Hey') - assert post.status == 'pending' - assert notify_moderators.call_count == 1 - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') [email protected](M.Thread, 'notify_moderators') -def test_spam_and_has_unmoderated_post_permission(notify_moderators, spam_checker): - spam_checker.check.return_value = True - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role = M.ProjectRole.by_name('*anonymous')._id - post_permission = M.ACE.allow(role, 'post') - unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') - t.acl.append(post_permission) - t.acl.append(unmoderated_post_permission) - with h.push_config(c, user=M.User.anonymous()): - post = t.post('Hey') - assert post.status == 'pending' - assert notify_moderators.call_count == 1 - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') -def test_thread_subject_not_included_in_text_checked(spam_checker): - spam_checker.check.return_value = False - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - t.post('Hello') - assert spam_checker.check.call_count == 1 - assert spam_checker.check.call_args[0][0] == 'Hello' - - -def test_post_count(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - M.Post(discussion_id=d._id, thread_id=t._id, status='spam') - M.Post(discussion_id=d._id, thread_id=t._id, status='ok') - M.Post(discussion_id=d._id, thread_id=t._id, status='pending') - ThreadLocalORMSession.flush_all() - assert t.post_count == 2 - - [email protected]('allura.controllers.discuss.g.spam_checker') -def test_spam_num_replies(spam_checker): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2) - M.Post(discussion_id=d._id, thread_id=t._id, status='ok') - ThreadLocalORMSession.flush_all() - p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam') - p1.spam() - assert t.num_replies == 1 - - -def test_deleted_thread_index(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - p = M.Post(discussion_id=d._id, thread_id=t._id, status='ok') - t.delete() - ThreadLocalORMSession.flush_all() - - # re-query, so relationships get reloaded - ThreadLocalORMSession.close_all() - p = M.Post.query.get(_id=p._id) - - # just make sure this doesn't fail - p.index() \ No newline at end of file + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role = M.ProjectRole.by_name('*anonymous')._id + post_permission = M.ACE.allow(role, 'post') + unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') + t.acl.append(post_permission) + t.acl.append(unmoderated_post_permission) + with h.push_config(c, user=M.User.anonymous()): + post = t.post('Hey') + assert post.status == 'pending' + assert notify_moderators.call_count == 1 + + @mock.patch('allura.controllers.discuss.g.spam_checker') + def test_thread_subject_not_included_in_text_checked(self, spam_checker): + spam_checker.check.return_value = False + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + t.post('Hello') + assert spam_checker.check.call_count == 1 + assert spam_checker.check.call_args[0][0] == 'Hello' + + def test_post_count(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + M.Post(discussion_id=d._id, thread_id=t._id, status='spam') + M.Post(discussion_id=d._id, thread_id=t._id, status='ok') + M.Post(discussion_id=d._id, thread_id=t._id, status='pending') + ThreadLocalORMSession.flush_all() + assert t.post_count == 2 + + @mock.patch('allura.controllers.discuss.g.spam_checker') + def test_spam_num_replies(self, spam_checker): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2) + M.Post(discussion_id=d._id, thread_id=t._id, status='ok') + ThreadLocalORMSession.flush_all() + p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam') + p1.spam() + assert t.num_replies == 1 + + def test_deleted_thread_index(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + p = M.Post(discussion_id=d._id, thread_id=t._id, status='ok') + t.delete() + ThreadLocalORMSession.flush_all() + + # re-query, so relationships get reloaded + ThreadLocalORMSession.close_all() + p = M.Post.query.get(_id=p._id) + + # just make sure this doesn't fail + p.index() diff --git a/Allura/allura/tests/model/test_monq.py b/Allura/allura/tests/model/test_monq.py index 9dc682149..9b2d51754 100644 --- a/Allura/allura/tests/model/test_monq.py +++ b/Allura/allura/tests/model/test_monq.py @@ -16,7 +16,6 @@ # under the License. import pprint -from alluratest.tools import with_setup from ming.orm import ThreadLocalORMSession @@ -24,14 +23,13 @@ from alluratest.controller import setup_basic_test, setup_global_objects from allura import model as M -def setup_method(): +def setup_module(): setup_basic_test() ThreadLocalORMSession.close_all() setup_global_objects() M.MonQTask.query.remove({}) -@with_setup(setup_method) def test_basic_task(): task = M.MonQTask.post(pprint.pformat, ([5, 6],)) ThreadLocalORMSession.flush_all() diff --git a/Allura/allura/tests/model/test_neighborhood.py b/Allura/allura/tests/model/test_neighborhood.py index 26a773331..247370888 100644 --- a/Allura/allura/tests/model/test_neighborhood.py +++ b/Allura/allura/tests/model/test_neighborhood.py @@ -25,65 +25,64 @@ from allura.tests import decorators as td from alluratest.controller import setup_basic_test, setup_global_objects -def setup_method(): - setup_basic_test() - setup_with_tools() +class TestNeighboorhoodModel: + def setup_method(self): + setup_basic_test() + self.setup_with_tools() [email protected]_wiki -def setup_with_tools(): - setup_global_objects() + @td.with_wiki + def setup_with_tools(self): + setup_global_objects() + def test_neighborhood(self): + neighborhood = M.Neighborhood.query.get(name='Projects') + # Check css output depends of neighborhood level + test_css = ".text{color:#000;}" + neighborhood.css = test_css + neighborhood.features['css'] = 'none' + assert neighborhood.get_custom_css() == "" + neighborhood.features['css'] = 'picker' + assert neighborhood.get_custom_css() == test_css + neighborhood.features['css'] = 'custom' + assert neighborhood.get_custom_css() == test_css + # Check max projects + neighborhood.features['max_projects'] = None + assert neighborhood.get_max_projects() is None + neighborhood.features['max_projects'] = 500 + assert neighborhood.get_max_projects() == 500 -@with_setup(setup_method) -def test_neighborhood(): - neighborhood = M.Neighborhood.query.get(name='Projects') - # Check css output depends of neighborhood level - test_css = ".text{color:#000;}" - neighborhood.css = test_css - neighborhood.features['css'] = 'none' - assert neighborhood.get_custom_css() == "" - neighborhood.features['css'] = 'picker' - assert neighborhood.get_custom_css() == test_css - neighborhood.features['css'] = 'custom' - assert neighborhood.get_custom_css() == test_css - # Check max projects - neighborhood.features['max_projects'] = None - assert neighborhood.get_max_projects() is None - neighborhood.features['max_projects'] = 500 - assert neighborhood.get_max_projects() == 500 + # Check picker css styles + test_css_dict = {'barontop': '#444', + 'titlebarbackground': '#555', + 'projecttitlefont': 'arial,sans-serif', + 'projecttitlecolor': '#333', + 'titlebarcolor': '#666'} + css_text = neighborhood.compile_css_for_picker(test_css_dict) + assert '#333' in css_text + assert '#444' in css_text + assert '#555' in css_text + assert '#666' in css_text + assert 'arial,sans-serif' in css_text + neighborhood.css = css_text + styles_list = neighborhood.get_css_for_picker() + for style in styles_list: + assert test_css_dict[style['name']] == style['value'] - # Check picker css styles - test_css_dict = {'barontop': '#444', - 'titlebarbackground': '#555', - 'projecttitlefont': 'arial,sans-serif', - 'projecttitlecolor': '#333', - 'titlebarcolor': '#666'} - css_text = neighborhood.compile_css_for_picker(test_css_dict) - assert '#333' in css_text - assert '#444' in css_text - assert '#555' in css_text - assert '#666' in css_text - assert 'arial,sans-serif' in css_text - neighborhood.css = css_text - styles_list = neighborhood.get_css_for_picker() - for style in styles_list: - assert test_css_dict[style['name']] == style['value'] + # Check neighborhood custom css showing + neighborhood.features['css'] = 'none' + assert not neighborhood.allow_custom_css + neighborhood.features['css'] = 'picker' + assert neighborhood.allow_custom_css + neighborhood.features['css'] = 'custom' + assert neighborhood.allow_custom_css - # Check neighborhood custom css showing - neighborhood.features['css'] = 'none' - assert not neighborhood.allow_custom_css - neighborhood.features['css'] = 'picker' - assert neighborhood.allow_custom_css - neighborhood.features['css'] = 'custom' - assert neighborhood.allow_custom_css + neighborhood.anchored_tools = 'wiki:Wiki, tickets:Tickets' + assert neighborhood.get_anchored_tools()['wiki'] == 'Wiki' + assert neighborhood.get_anchored_tools()['tickets'] == 'Tickets' - neighborhood.anchored_tools = 'wiki:Wiki, tickets:Tickets' - assert neighborhood.get_anchored_tools()['wiki'] == 'Wiki' - assert neighborhood.get_anchored_tools()['tickets'] == 'Tickets' + neighborhood.prohibited_tools = 'wiki, tickets' + assert neighborhood.get_prohibited_tools() == ['wiki', 'tickets'] - neighborhood.prohibited_tools = 'wiki, tickets' - assert neighborhood.get_prohibited_tools() == ['wiki', 'tickets'] - - # Check properties - assert neighborhood.shortname == "p" + # Check properties + assert neighborhood.shortname == "p" diff --git a/Allura/allura/tests/model/test_oauth.py b/Allura/allura/tests/model/test_oauth.py index 67d12b094..edb396abc 100644 --- a/Allura/allura/tests/model/test_oauth.py +++ b/Allura/allura/tests/model/test_oauth.py @@ -16,28 +16,26 @@ # under the License. -from alluratest.tools import with_setup, assert_equal, assert_not_equal - from ming.odm import ThreadLocalORMSession from allura import model as M from alluratest.controller import setup_basic_test, setup_global_objects -def setup_method(): - setup_basic_test() - ThreadLocalORMSession.close_all() - setup_global_objects() +class TestOAuthModel: + def setup_method(self): + setup_basic_test() + ThreadLocalORMSession.close_all() + setup_global_objects() -@with_setup(setup_method) -def test_upsert(): - admin = M.User.by_username('test-admin') - user = M.User.by_username('test-user') - name = 'test-token' - token1 = M.OAuthConsumerToken.upsert(name, admin) - token2 = M.OAuthConsumerToken.upsert(name, admin) - token3 = M.OAuthConsumerToken.upsert(name, user) - assert M.OAuthConsumerToken.query.find().count() == 2 - assert token1._id == token2._id - assert token1._id != token3._id + def test_upsert(self): + admin = M.User.by_username('test-admin') + user = M.User.by_username('test-user') + name = 'test-token' + token1 = M.OAuthConsumerToken.upsert(name, admin) + token2 = M.OAuthConsumerToken.upsert(name, admin) + token3 = M.OAuthConsumerToken.upsert(name, user) + assert M.OAuthConsumerToken.query.find().count() == 2 + assert token1._id == token2._id + assert token1._id != token3._id diff --git a/Allura/allura/tests/model/test_project.py b/Allura/allura/tests/model/test_project.py index 524754bc3..6a1d44a8a 100644 --- a/Allura/allura/tests/model/test_project.py +++ b/Allura/allura/tests/model/test_project.py @@ -31,172 +31,161 @@ from allura.lib.exceptions import ToolError, Invalid from mock import MagicMock, patch -def setup_method(): - setup_basic_test() - setup_with_tools() - - [email protected]_wiki -def setup_with_tools(): - setup_global_objects() - - -@with_setup(setup_method) -def test_project(): - assert type(c.project.sidebar_menu()) == list - assert c.project.script_name in c.project.url() - old_proj = c.project - h.set_context('test/sub1', neighborhood='Projects') - assert type(c.project.sidebar_menu()) == list - assert type(c.project.sitemap()) == list - assert c.project.sitemap()[1].label == 'Admin' - assert old_proj in list(c.project.parent_iter()) - h.set_context('test', 'wiki', neighborhood='Projects') - adobe_nbhd = M.Neighborhood.query.get(name='Adobe') - p = M.Project.query.get( - shortname='adobe-1', neighborhood_id=adobe_nbhd._id) - # assert 'http' in p.url() # We moved adobe into /adobe/, not - # http://adobe.... - assert p.script_name in p.url() - assert c.project.shortname == 'test' - assert '<p>' in c.project.description_html - c.project.uninstall_app('hello-test-mount-point') - ThreadLocalORMSession.flush_all() - - c.project.install_app('Wiki', 'hello-test-mount-point') - c.project.support_page = 'hello-test-mount-point' - assert c.project.app_config('wiki').tool_name == 'wiki' - ThreadLocalORMSession.flush_all() - with td.raises(ToolError): - # already installed +class TestProjectModel: + + def setup_method(self): + setup_basic_test() + self.setup_with_tools() + + @td.with_wiki + def setup_with_tools(self): + setup_global_objects() + + def test_project(self): + assert type(c.project.sidebar_menu()) == list + assert c.project.script_name in c.project.url() + old_proj = c.project + h.set_context('test/sub1', neighborhood='Projects') + assert type(c.project.sidebar_menu()) == list + assert type(c.project.sitemap()) == list + assert c.project.sitemap()[1].label == 'Admin' + assert old_proj in list(c.project.parent_iter()) + h.set_context('test', 'wiki', neighborhood='Projects') + adobe_nbhd = M.Neighborhood.query.get(name='Adobe') + p = M.Project.query.get( + shortname='adobe-1', neighborhood_id=adobe_nbhd._id) + # assert 'http' in p.url() # We moved adobe into /adobe/, not + # http://adobe.... + assert p.script_name in p.url() + assert c.project.shortname == 'test' + assert '<p>' in c.project.description_html + c.project.uninstall_app('hello-test-mount-point') + ThreadLocalORMSession.flush_all() + c.project.install_app('Wiki', 'hello-test-mount-point') - ThreadLocalORMSession.flush_all() - c.project.uninstall_app('hello-test-mount-point') - ThreadLocalORMSession.flush_all() - with td.raises(ToolError): - # mount point reserved - c.project.install_app('Wiki', 'feed') - with td.raises(ToolError): - # mount point too long - c.project.install_app('Wiki', 'a' * 64) - with td.raises(ToolError): - # mount point must begin with letter - c.project.install_app('Wiki', '1') - # single letter mount points are allowed - c.project.install_app('Wiki', 'a') - # Make sure the project support page is reset if the tool it was pointing - # to is uninstalled. - assert c.project.support_page == '' - app_config = c.project.app_config('hello') - app_inst = c.project.app_instance(app_config) - app_inst = c.project.app_instance('hello') - app_inst = c.project.app_instance('hello2123') - c.project.breadcrumbs() - c.app.config.breadcrumbs() - - -@with_setup(setup_method) -def test_install_app_validates_options(): - from forgetracker.tracker_main import ForgeTrackerApp - name = 'TicketMonitoringEmail' - opt = [o for o in ForgeTrackerApp.config_options if o.name == name][0] - opt.validator = fev.Email(not_empty=True) - with patch.object(ForgeTrackerApp, 'config_on_install', new=[opt.name]): - for v in [None, '', 'bad@email']: - with td.raises(ToolError): - c.project.install_app('Tickets', 'test-tickets', **{name: v}) - assert c.project.app_instance('test-tickets') == None - c.project.install_app('Tickets', 'test-tickets', **{name: '[email protected]'}) - app = c.project.app_instance('test-tickets') - assert app.config.options[name] == '[email protected]' - - -def test_project_index(): - project, idx = c.project, c.project.index() - assert 'id' in idx - assert idx['id'] == project.index_id() - assert 'title' in idx - assert 'type_s' in idx - assert 'deleted_b' in idx - assert 'private_b' in idx - assert 'neighborhood_id_s' in idx - assert 'short_description_t' in idx - assert 'url_s' in idx - - -def test_subproject(): - project = M.Project.query.get(shortname='test') - with td.raises(ToolError): - with patch('allura.lib.plugin.ProjectRegistrationProvider') as Provider: - Provider.get().shortname_validator.to_python.side_effect = Invalid( - 'name', 'value', {}) - # name doesn't validate - sp = project.new_subproject('test-proj-nose') - sp = project.new_subproject('test-proj-nose') - spp = sp.new_subproject('spp') - ThreadLocalORMSession.flush_all() - sp.delete() - ThreadLocalORMSession.flush_all() - - [email protected]_wiki -def test_anchored_tools(): - c.project.neighborhood.anchored_tools = 'wiki:Wiki, tickets:Ticket' - c.project.install_app = MagicMock() - assert c.project.sitemap()[0].label == 'Wiki' - assert c.project.install_app.call_args[0][0] == 'tickets' - assert c.project.ordered_mounts()[0]['ac'].tool_name == 'wiki' - - -def test_set_ordinal_to_admin_tool(): - with h.push_config(c, - user=M.User.by_username('test-admin'), - project=M.Project.query.get(shortname='test')): - sm = c.project.sitemap() - assert sm[-1].tool_name == 'admin' - - -@with_setup(setup_method) -def test_users_and_roles(): - p = M.Project.query.get(shortname='test') - sub = p.direct_subprojects[0] - u = M.User.by_username('test-admin') - assert p.users_with_role('Admin') == [u] - assert p.users_with_role('Admin') == sub.users_with_role('Admin') - assert p.users_with_role('Admin') == p.admins() - - user = p.admins()[0] - user.disabled = True - ThreadLocalORMSession.flush_all() - assert p.users_with_role('Admin') == [] - assert p.users_with_role('Admin') == p.admins() - - -@with_setup(setup_method) -def test_project_disabled_users(): - p = M.Project.query.get(shortname='test') - users = p.users() - assert users[0].username == 'test-admin' - user = M.User.by_username('test-admin') - user.disabled = True - ThreadLocalORMSession.flush_all() - users = p.users() - assert users == [] - -def test_screenshot_unicode_serialization(): - p = M.Project.query.get(shortname='test') - screenshot_unicode = M.ProjectFile(project_id=p._id, category='screenshot', caption="ConSelección", filename='ConSelección.jpg') - screenshot_ascii = M.ProjectFile(project_id=p._id, category='screenshot', caption='test-screenshot', filename='test_file.jpg') - ThreadLocalORMSession.flush_all() - - serialized = p.__json__() - screenshots = sorted(serialized['screenshots'], key=lambda k: k['caption']) - - assert len(screenshots) == 2 - assert screenshots[0]['url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg' - assert screenshots[0]['caption'] == "ConSelección" - assert screenshots[0]['thumbnail_url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg/thumb' - - assert screenshots[1]['url'] == 'http://localhost/p/test/screenshot/test_file.jpg' - assert screenshots[1]['caption'] == 'test-screenshot' - assert screenshots[1]['thumbnail_url'] == 'http://localhost/p/test/screenshot/test_file.jpg/thumb' + c.project.support_page = 'hello-test-mount-point' + assert c.project.app_config('wiki').tool_name == 'wiki' + ThreadLocalORMSession.flush_all() + with td.raises(ToolError): + # already installed + c.project.install_app('Wiki', 'hello-test-mount-point') + ThreadLocalORMSession.flush_all() + c.project.uninstall_app('hello-test-mount-point') + ThreadLocalORMSession.flush_all() + with td.raises(ToolError): + # mount point reserved + c.project.install_app('Wiki', 'feed') + with td.raises(ToolError): + # mount point too long + c.project.install_app('Wiki', 'a' * 64) + with td.raises(ToolError): + # mount point must begin with letter + c.project.install_app('Wiki', '1') + # single letter mount points are allowed + c.project.install_app('Wiki', 'a') + # Make sure the project support page is reset if the tool it was pointing + # to is uninstalled. + assert c.project.support_page == '' + app_config = c.project.app_config('hello') + app_inst = c.project.app_instance(app_config) + app_inst = c.project.app_instance('hello') + app_inst = c.project.app_instance('hello2123') + c.project.breadcrumbs() + c.app.config.breadcrumbs() + + def test_install_app_validates_options(self): + from forgetracker.tracker_main import ForgeTrackerApp + name = 'TicketMonitoringEmail' + opt = [o for o in ForgeTrackerApp.config_options if o.name == name][0] + opt.validator = fev.Email(not_empty=True) + with patch.object(ForgeTrackerApp, 'config_on_install', new=[opt.name]): + for v in [None, '', 'bad@email']: + with td.raises(ToolError): + c.project.install_app('Tickets', 'test-tickets', **{name: v}) + assert c.project.app_instance('test-tickets') == None + c.project.install_app('Tickets', 'test-tickets', **{name: '[email protected]'}) + app = c.project.app_instance('test-tickets') + assert app.config.options[name] == '[email protected]' + + def test_project_index(self): + project, idx = c.project, c.project.index() + assert 'id' in idx + assert idx['id'] == project.index_id() + assert 'title' in idx + assert 'type_s' in idx + assert 'deleted_b' in idx + assert 'private_b' in idx + assert 'neighborhood_id_s' in idx + assert 'short_description_t' in idx + assert 'url_s' in idx + + def test_subproject(self): + project = M.Project.query.get(shortname='test') + with td.raises(ToolError): + with patch('allura.lib.plugin.ProjectRegistrationProvider') as Provider: + Provider.get().shortname_validator.to_python.side_effect = Invalid( + 'name', 'value', {}) + # name doesn't validate + sp = project.new_subproject('test-proj-nose') + sp = project.new_subproject('test-proj-nose') + spp = sp.new_subproject('spp') + ThreadLocalORMSession.flush_all() + sp.delete() + ThreadLocalORMSession.flush_all() + + @td.with_wiki + def test_anchored_tools(self): + c.project.neighborhood.anchored_tools = 'wiki:Wiki, tickets:Ticket' + c.project.install_app = MagicMock() + assert c.project.sitemap()[0].label == 'Wiki' + assert c.project.install_app.call_args[0][0] == 'tickets' + assert c.project.ordered_mounts()[0]['ac'].tool_name == 'wiki' + + def test_set_ordinal_to_admin_tool(self): + with h.push_config(c, + user=M.User.by_username('test-admin'), + project=M.Project.query.get(shortname='test')): + sm = c.project.sitemap() + assert sm[-1].tool_name == 'admin' + + def test_users_and_roles(self): + p = M.Project.query.get(shortname='test') + sub = p.direct_subprojects[0] + u = M.User.by_username('test-admin') + assert p.users_with_role('Admin') == [u] + assert p.users_with_role('Admin') == sub.users_with_role('Admin') + assert p.users_with_role('Admin') == p.admins() + + user = p.admins()[0] + user.disabled = True + ThreadLocalORMSession.flush_all() + assert p.users_with_role('Admin') == [] + assert p.users_with_role('Admin') == p.admins() + + def test_project_disabled_users(self): + p = M.Project.query.get(shortname='test') + users = p.users() + assert users[0].username == 'test-admin' + user = M.User.by_username('test-admin') + user.disabled = True + ThreadLocalORMSession.flush_all() + users = p.users() + assert users == [] + + def test_screenshot_unicode_serialization(self): + p = M.Project.query.get(shortname='test') + screenshot_unicode = M.ProjectFile(project_id=p._id, category='screenshot', caption="ConSelección", filename='ConSelección.jpg') + screenshot_ascii = M.ProjectFile(project_id=p._id, category='screenshot', caption='test-screenshot', filename='test_file.jpg') + ThreadLocalORMSession.flush_all() + + serialized = p.__json__() + screenshots = sorted(serialized['screenshots'], key=lambda k: k['caption']) + + assert len(screenshots) == 2 + assert screenshots[0]['url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg' + assert screenshots[0]['caption'] == "ConSelección" + assert screenshots[0]['thumbnail_url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg/thumb' + + assert screenshots[1]['url'] == 'http://localhost/p/test/screenshot/test_file.jpg' + assert screenshots[1]['caption'] == 'test-screenshot' + assert screenshots[1]['thumbnail_url'] == 'http://localhost/p/test/screenshot/test_file.jpg/thumb' diff --git a/Allura/allura/tests/test_app.py b/Allura/allura/tests/test_app.py index 71733a2d5..8a6924ce2 100644 --- a/Allura/allura/tests/test_app.py +++ b/Allura/allura/tests/test_app.py @@ -20,161 +20,152 @@ import mock from ming.base import Object import pytest from formencode import validators as fev +from textwrap import dedent from alluratest.controller import setup_unit_test from alluratest.tools import with_setup from allura import app from allura.lib.app_globals import Icon from allura.lib import mail_util -from alluratest.pytest_helpers import with_nose_compatibility - - -def setup_method(): - setup_unit_test() - c.user._id = None - c.project = mock.Mock() - c.project.name = 'Test Project' - c.project.shortname = 'tp' - c.project._id = 'testproject/' - c.project.url = lambda: '/testproject/' - app_config = mock.Mock() - app_config._id = None - app_config.project_id = 'testproject/' - app_config.tool_name = 'tool' - app_config.options = Object(mount_point='foo') - c.app = mock.Mock() - c.app.config = app_config - c.app.config.script_name = lambda: '/testproject/test_application/' - c.app.config.url = lambda: 'http://testproject/test_application/' - c.app.url = c.app.config.url() - c.app.__version__ = '0.0' - -def test_config_options(): - options = [ - app.ConfigOption('test1', str, 'MyTestValue'), - app.ConfigOption('test2', str, lambda:'MyTestValue')] - assert options[0].default == 'MyTestValue' - assert options[1].default == 'MyTestValue' - - -def test_config_options_render_attrs(): - opt = app.ConfigOption('test1', str, None, extra_attrs={'type': 'url'}) - assert opt.render_attrs() == 'type="url"' - - -def test_config_option_without_validator(): - opt = app.ConfigOption('test1', str, None) - assert opt.validate(None) == None - assert opt.validate('') == '' - assert opt.validate('val') == 'val' - - -def test_config_option_with_validator(): - v = fev.NotEmpty() - opt = app.ConfigOption('test1', str, None, validator=v) - assert opt.validate('val') == 'val' - pytest.raises(fev.Invalid, opt.validate, None) - pytest.raises(fev.Invalid, opt.validate, '') - - -@with_setup(setup_method) -def test_options_on_install_default(): - a = app.Application(c.project, c.app.config) - assert a.options_on_install() == [] - - -@with_setup(setup_method) -def test_options_on_install(): - opts = [app.ConfigOption('url', str, None), - app.ConfigOption('private', bool, None)] - class TestApp(app.Application): - config_options = app.Application.config_options + opts + [ - app.ConfigOption('not_on_install', str, None), - ] - config_on_install = ['url', 'private'] - - a = TestApp(c.project, c.app.config) - assert a.options_on_install() == opts - -@with_setup(setup_method) -def test_main_menu(): - class TestApp(app.Application): - @property - def sitemap(self): - children = [app.SitemapEntry('New', 'new', ui_icon=Icon('some-icon')), - app.SitemapEntry('Recent', 'recent'), - ] - return [app.SitemapEntry('My Tool', '.')[children]] - - a = TestApp(c.project, c.app.config) - main_menu = a.main_menu() - assert len(main_menu) == 1 - assert main_menu[0].children == [] # default main_menu implementation should drop the children from sitemap() - - -@with_setup(setup_method) -def test_sitemap(): - sm = app.SitemapEntry('test', '')[ - app.SitemapEntry('a', 'a/'), - app.SitemapEntry('b', 'b/')] - sm[app.SitemapEntry(lambda app:app.config.script_name(), 'c/')] - bound_sm = sm.bind_app(c.app) - assert bound_sm.url == 'http://testproject/test_application/', bound_sm.url - assert bound_sm.children[ - -1].label == '/testproject/test_application/', bound_sm.children[-1].label - assert len(sm.children) == 3 - sm.extend([app.SitemapEntry('a', 'a/')[ - app.SitemapEntry('d', 'd/')]]) - assert len(sm.children) == 3 - - -@with_setup(setup_method) [email protected]('allura.app.Application.PostClass.query.get') -def test_handle_artifact_unicode(qg): - """ - Tests that app.handle_artifact_message can accept utf strings - """ - ticket = mock.MagicMock() - ticket.get_discussion_thread.return_value = (mock.MagicMock(), mock.MagicMock()) - post = mock.MagicMock() - qg.return_value = post - - a = app.Application(c.project, c.app.config) - - msg = dict(payload='foo ƒ†©¥˙¨ˆ'.encode(), message_id=1, headers={}) - a.handle_artifact_message(ticket, msg) - assert post.attach.call_args[0][1].getvalue() == 'foo ƒ†©¥˙¨ˆ'.encode() - - msg = dict(payload=b'foo', message_id=1, headers={}) - a.handle_artifact_message(ticket, msg) - assert post.attach.call_args[0][1].getvalue() == b'foo' - - msg = dict(payload="\x94my quote\x94".encode(), message_id=1, headers={}) - a.handle_artifact_message(ticket, msg) - assert post.attach.call_args[0][1].getvalue() == '\x94my quote\x94'.encode() - - # assert against prod example - msg_raw = """Message-Id: <1502352031.3216858.1068961568.19ef4...@webmail.messagingengine.com> -From: foo <[email protected]> -To: "[forge:site-support]" <[email protected]> -MIME-Version: 1.0 -Content-Transfer-Encoding: 7bit -Content-Type: multipart/alternative; boundary="_----------=_150235203132168580" -Date: Thu, 10 Aug 2017 10:00:31 +0200 -Subject: Re: [forge:site-support] #15391 Unable to join (my own) mailing list -This is a multi-part message in MIME format. ---_----------=_150235203132168580 -Content-Transfer-Encoding: quoted-printable -Content-Type: text/plain; charset="utf-8" -Hi ---_----------=_150235203132168580 -Content-Transfer-Encoding: quoted-printable -Content-Type: text/html; charset="utf-8" -<!DOCTYPE html> -<html><body>Hi</body></html> ---_----------=_150235203132168580-- - """ - msg = mail_util.parse_message(msg_raw) - for p in [p for p in msg['parts'] if p['payload'] is not None]: - # filter here mimics logic in `route_email` - a.handle_artifact_message(ticket, p) + + +class TestApp: + + def setup_method(self): + setup_unit_test() + c.user._id = None + c.project = mock.Mock() + c.project.name = 'Test Project' + c.project.shortname = 'tp' + c.project._id = 'testproject/' + c.project.url = lambda: '/testproject/' + app_config = mock.Mock() + app_config._id = None + app_config.project_id = 'testproject/' + app_config.tool_name = 'tool' + app_config.options = Object(mount_point='foo') + c.app = mock.Mock() + c.app.config = app_config + c.app.config.script_name = lambda: '/testproject/test_application/' + c.app.config.url = lambda: 'http://testproject/test_application/' + c.app.url = c.app.config.url() + c.app.__version__ = '0.0' + + def test_config_options(self): + options = [ + app.ConfigOption('test1', str, 'MyTestValue'), + app.ConfigOption('test2', str, lambda:'MyTestValue')] + assert options[0].default == 'MyTestValue' + assert options[1].default == 'MyTestValue' + + def test_config_options_render_attrs(self): + opt = app.ConfigOption('test1', str, None, extra_attrs={'type': 'url'}) + assert opt.render_attrs() == 'type="url"' + + def test_config_option_without_validator(self): + opt = app.ConfigOption('test1', str, None) + assert opt.validate(None) == None + assert opt.validate('') == '' + assert opt.validate('val') == 'val' + + def test_config_option_with_validator(self): + v = fev.NotEmpty() + opt = app.ConfigOption('test1', str, None, validator=v) + assert opt.validate('val') == 'val' + pytest.raises(fev.Invalid, opt.validate, None) + pytest.raises(fev.Invalid, opt.validate, '') + + def test_options_on_install_default(self): + a = app.Application(c.project, c.app.config) + assert a.options_on_install() == [] + + def test_options_on_install(self): + opts = [app.ConfigOption('url', str, None), + app.ConfigOption('private', bool, None)] + class TestApp(app.Application): + config_options = app.Application.config_options + opts + [ + app.ConfigOption('not_on_install', str, None), + ] + config_on_install = ['url', 'private'] + + a = TestApp(c.project, c.app.config) + assert a.options_on_install() == opts + + def test_main_menu(self): + class TestApp(app.Application): + @property + def sitemap(self): + children = [app.SitemapEntry('New', 'new', ui_icon=Icon('some-icon')), + app.SitemapEntry('Recent', 'recent'), + ] + return [app.SitemapEntry('My Tool', '.')[children]] + + a = TestApp(c.project, c.app.config) + main_menu = a.main_menu() + assert len(main_menu) == 1 + assert main_menu[0].children == [] # default main_menu implementation should drop the children from sitemap() + + def test_sitemap(self): + sm = app.SitemapEntry('test', '')[ + app.SitemapEntry('a', 'a/'), + app.SitemapEntry('b', 'b/')] + sm[app.SitemapEntry(lambda app:app.config.script_name(), 'c/')] + bound_sm = sm.bind_app(c.app) + assert bound_sm.url == 'http://testproject/test_application/', bound_sm.url + assert bound_sm.children[ + -1].label == '/testproject/test_application/', bound_sm.children[-1].label + assert len(sm.children) == 3 + sm.extend([app.SitemapEntry('a', 'a/')[ + app.SitemapEntry('d', 'd/')]]) + assert len(sm.children) == 3 + + @mock.patch('allura.app.Application.PostClass.query.get') + def test_handle_artifact_unicode(self, qg): + """ + Tests that app.handle_artifact_message can accept utf strings + """ + ticket = mock.MagicMock() + ticket.get_discussion_thread.return_value = (mock.MagicMock(), mock.MagicMock()) + post = mock.MagicMock() + qg.return_value = post + + a = app.Application(c.project, c.app.config) + + msg = dict(payload='foo ƒ†©¥˙¨ˆ'.encode(), message_id=1, headers={}) + a.handle_artifact_message(ticket, msg) + assert post.attach.call_args[0][1].getvalue() == 'foo ƒ†©¥˙¨ˆ'.encode() + + msg = dict(payload=b'foo', message_id=1, headers={}) + a.handle_artifact_message(ticket, msg) + assert post.attach.call_args[0][1].getvalue() == b'foo' + + msg = dict(payload="\x94my quote\x94".encode(), message_id=1, headers={}) + a.handle_artifact_message(ticket, msg) + assert post.attach.call_args[0][1].getvalue() == '\x94my quote\x94'.encode() + + # assert against prod example + msg_raw = dedent("""\ + Message-Id: <1502352031.3216858.1068961568.19ef4...@webmail.messagingengine.com> + From: foo <[email protected]> + To: "[forge:site-support]" <[email protected]> + MIME-Version: 1.0 + Content-Transfer-Encoding: 7bit + Content-Type: multipart/alternative; boundary="_----------=_150235203132168580" + Date: Thu, 10 Aug 2017 10:00:31 +0200 + Subject: Re: [forge:site-support] #15391 Unable to join (my own) mailing list + This is a multi-part message in MIME format. + --_----------=_150235203132168580 + Content-Transfer-Encoding: quoted-printable + Content-Type: text/plain; charset="utf-8" + Hi + --_----------=_150235203132168580 + Content-Transfer-Encoding: quoted-printable + Content-Type: text/html; charset="utf-8" + <!DOCTYPE html> + <html><body>Hi</body></html> + --_----------=_150235203132168580-- + """) + msg = mail_util.parse_message(msg_raw) + for p in [p for p in msg['parts'] if p['payload'] is not None]: + # filter here mimics logic in `route_email` + a.handle_artifact_message(ticket, p)
