This is an automated email from the ASF dual-hosted git repository. dill0wn pushed a commit to branch dw/8455-part2 in repository https://gitbox.apache.org/repos/asf/allura.git
commit 7805b2940404e8d5a6d1a139624a7bb8ddf193ed Author: Dillon Walls <[email protected]> AuthorDate: Thu Sep 29 20:37:20 2022 +0000 fixup! fixup! [#8455] remove @with_setup --- Allura/allura/tests/model/test_discussion.py | 966 +++++++++++++-------------- 1 file changed, 460 insertions(+), 506 deletions(-) diff --git a/Allura/allura/tests/model/test_discussion.py b/Allura/allura/tests/model/test_discussion.py index d5abf1a05..3c158cd3f 100644 --- a/Allura/allura/tests/model/test_discussion.py +++ b/Allura/allura/tests/model/test_discussion.py @@ -24,10 +24,8 @@ from datetime import datetime, timedelta from cgi import FieldStorage from tg import tmpl_context as c -from alluratest.tools import assert_equals, with_setup import mock from mock import patch -from alluratest.tools import assert_equal, assert_in from ming.orm import session, ThreadLocalORMSession from webob import exc @@ -38,513 +36,469 @@ from allura.tests import TestController from alluratest.controller import setup_global_objects -def setup_method(): - controller = TestController() - controller.setup_method(None) - controller.app.get('/wiki/Home/') - setup_global_objects() - ThreadLocalORMSession.close_all() - h.set_context('test', 'wiki', neighborhood='Projects') - ThreadLocalORMSession.flush_all() - ThreadLocalORMSession.close_all() - - -def teardown_module(): - ThreadLocalORMSession.close_all() - - -@with_setup(setup_method) -def test_discussion_methods(): - d = M.Discussion(shortname='test', name='test') - assert d.thread_class() == M.Thread - assert d.post_class() == M.Post - assert d.attachment_class() == M.DiscussionAttachment - ThreadLocalORMSession.flush_all() - d.update_stats() - ThreadLocalORMSession.flush_all() - assert d.last_post is None - assert d.url().endswith('wiki/_discuss/') - assert d.index()['name_s'] == 'test' - assert d.find_posts().count() == 0 - jsn = d.__json__() - assert jsn['name'] == d.name - d.delete() - ThreadLocalORMSession.flush_all() - ThreadLocalORMSession.close_all() - - -@with_setup(setup_method) -def test_thread_methods(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - assert t.discussion_class() == M.Discussion - assert t.post_class() == M.Post - assert t.attachment_class() == M.DiscussionAttachment - p0 = t.post('This is a post') - p1 = t.post('This is another post') - time.sleep(0.25) - t.post('This is a reply', parent_id=p0._id) - ThreadLocalORMSession.flush_all() - ThreadLocalORMSession.close_all() - d = M.Discussion.query.get(shortname='test') - t = d.threads[0] - assert d.last_post is not None - assert t.last_post is not None - t.create_post_threads(t.posts) - posts0 = t.find_posts(page=0, limit=10, style='threaded') - posts1 = t.find_posts(page=0, limit=10, style='timestamp') - assert posts0 != posts1 - ts = p0.timestamp.replace( - microsecond=int(p0.timestamp.microsecond // 1000) * 1000) - posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts) - assert len(posts2) > 0 - - assert 'wiki/_discuss/' in t.url() - assert t.index()['views_i'] == 0 - assert t.post_count == 3 - jsn = t.__json__() - assert '_id' in jsn - assert len(jsn['posts']) == 3 - (p.approve() for p in (p0, p1)) - ThreadLocalORMSession.flush_all() - assert t.num_replies == 3 - t.spam() - assert t.num_replies == 0 - ThreadLocalORMSession.flush_all() - assert len(t.find_posts()) == 0 - t.delete() - - -@with_setup(setup_method) -def test_thread_new(): - with mock.patch('allura.model.discuss.h.nonce') as nonce: - nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead'] +class TestDiscussion: + + def setup_method(self): + controller = TestController() + controller.setup_method(None) + controller.app.get('/wiki/Home/') + setup_global_objects() + ThreadLocalORMSession.close_all() + h.set_context('test', 'wiki', neighborhood='Projects') + ThreadLocalORMSession.flush_all() + ThreadLocalORMSession.close_all() + + @classmethod + def teardown_class(cls): + ThreadLocalORMSession.close_all() + + def test_discussion_methods(self): d = M.Discussion(shortname='test', name='test') - t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One') - t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two') + assert d.thread_class() == M.Thread + assert d.post_class() == M.Post + assert d.attachment_class() == M.DiscussionAttachment ThreadLocalORMSession.flush_all() - session(t1).expunge(t1) - session(t2).expunge(t2) - t1_2 = M.Thread.query.get(_id=t1._id) - t2_2 = M.Thread.query.get(_id=t2._id) - assert t1._id == 'deadbeef' - assert t2._id == 'beefdead' - assert t1_2.subject == 'Test Thread One' - assert t2_2.subject == 'Test Thread Two' - - -@with_setup(setup_method) -def test_post_methods(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p2 = t.post('This is another post') - assert p.discussion_class() == M.Discussion - assert p.thread_class() == M.Thread - assert p.attachment_class() == M.DiscussionAttachment - p.commit() - assert p.parent is None - assert p.subject == 'Test Thread' - assert p.attachments == [] - assert 'wiki/_discuss' in p.url() - assert p.reply_subject() == 'Re: Test Thread' - assert p.link_text() == p.subject - - ss = p.history().first() - assert 'version' in h.get_first(ss.index(), 'title') - assert '#' in ss.shorthand_id() - - jsn = p.__json__() - assert jsn["thread_id"] == t._id - - (p.approve() for p in (p, p2)) - ThreadLocalORMSession.flush_all() - assert t.num_replies == 2 - p.spam() - assert t.num_replies == 1 - p.undo('ok') - assert t.num_replies == 2 - p.delete() - assert t.num_replies == 1 - - -@with_setup(setup_method) -def test_attachment_methods(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p_att = p.attach('foo.text', BytesIO(b'Hello, world!'), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - t_att = p.attach('foo2.text', BytesIO(b'Hello, thread!'), - discussion_id=d._id, - thread_id=t._id) - d_att = p.attach('foo3.text', BytesIO(b'Hello, discussion!'), - discussion_id=d._id) - - ThreadLocalORMSession.flush_all() - assert p_att.post == p - assert p_att.thread == t - assert p_att.discussion == d - for att in (p_att, t_att, d_att): - assert 'wiki/_discuss' in att.url() - assert 'attachment/' in att.url() - - # Test notification in mail - t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') - fs = FieldStorage() - fs.name = 'file_info' - fs.filename = 'fake.txt' - fs.type = 'text/plain' - fs.file = BytesIO(b'this is the content of the fake file\n') - p = t.post(text='test message', forum=None, subject='', file_info=fs) - ThreadLocalORMSession.flush_all() - n = M.Notification.query.get( - subject='[test:wiki] Test comment notification') - url = h.absurl(f'{p.url()}attachment/{fs.filename}') - assert ( - '\nAttachments:\n\n' - '- [fake.txt]({}) (37 Bytes; text/plain)'.format(url) in - n.text) - - -@with_setup(setup_method) -def test_multiple_attachments(): - test_file1 = FieldStorage() - test_file1.name = 'file_info' - test_file1.filename = 'test1.txt' - test_file1.type = 'text/plain' - test_file1.file = BytesIO(b'test file1\n') - test_file2 = FieldStorage() - test_file2.name = 'file_info' - test_file2.filename = 'test2.txt' - test_file2.type = 'text/plain' - test_file2.file = BytesIO(b'test file2\n') - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - test_post = t.post('test post') - test_post.add_multiple_attachments([test_file1, test_file2]) - ThreadLocalORMSession.flush_all() - assert len(test_post.attachments) == 2 - attaches = test_post.attachments - assert 'test1.txt' in [attaches[0].filename, attaches[1].filename] - assert 'test2.txt' in [attaches[0].filename, attaches[1].filename] - - -@with_setup(setup_method) -def test_add_attachment(): - test_file = FieldStorage() - test_file.name = 'file_info' - test_file.filename = 'test.txt' - test_file.type = 'text/plain' - test_file.file = BytesIO(b'test file\n') - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - test_post = t.post('test post') - test_post.add_attachment(test_file) - ThreadLocalORMSession.flush_all() - assert len(test_post.attachments) == 1 - attach = test_post.attachments[0] - assert attach.filename == 'test.txt', attach.filename - assert attach.content_type == 'text/plain', attach.content_type - - -def test_notification_two_attaches(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') - fs1 = FieldStorage() - fs1.name = 'file_info' - fs1.filename = 'fake.txt' - fs1.type = 'text/plain' - fs1.file = BytesIO(b'this is the content of the fake file\n') - fs2 = FieldStorage() - fs2.name = 'file_info' - fs2.filename = 'fake2.txt' - fs2.type = 'text/plain' - fs2.file = BytesIO(b'this is the content of the fake file\n') - p = t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2]) - ThreadLocalORMSession.flush_all() - n = M.Notification.query.get( - subject='[test:wiki] Test comment notification') - base_url = h.absurl(f'{p.url()}attachment/') - assert ( - '\nAttachments:\n\n' - '- [fake.txt]({0}fake.txt) (37 Bytes; text/plain)\n' - '- [fake2.txt]({0}fake2.txt) (37 Bytes; text/plain)'.format(base_url) in - n.text) - - -@with_setup(setup_method) -def test_discussion_delete(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p.attach('foo.text', BytesIO(b''), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - M.ArtifactReference.from_artifact(d) - rid = d.index_id() - ThreadLocalORMSession.flush_all() - d.delete() - ThreadLocalORMSession.flush_all() - assert M.ArtifactReference.query.find(dict(_id=rid)).count() == 0 - - -@with_setup(setup_method) -def test_thread_delete(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p.attach('foo.text', BytesIO(b''), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - ThreadLocalORMSession.flush_all() - t.delete() - - -@with_setup(setup_method) -def test_post_delete(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - p.attach('foo.text', BytesIO(b''), - discussion_id=d._id, - thread_id=t._id, - post_id=p._id) - ThreadLocalORMSession.flush_all() - p.delete() - - -@with_setup(setup_method) -def test_post_undo(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - p = t.post('This is a post') - t.post('This is a post2') - t.post('This is a post3') - ThreadLocalORMSession.flush_all() - assert t.num_replies == 3 - p.spam() - assert t.num_replies == 2 - p.undo('ok') - assert t.num_replies == 3 - - -@with_setup(setup_method) -def test_post_permission_check(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - c.user = M.User.anonymous() - try: - t.post('This post will fail the check.') - assert False, "Expected an anonymous post to fail." - except exc.HTTPUnauthorized: - pass - t.post('This post will pass the check.', ignore_security=True) - - -@with_setup(setup_method) -def test_post_url_paginated(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - p = [] # posts in display order - ts = datetime.utcnow() - timedelta(days=1) - for i in range(5): - ts += timedelta(minutes=1) - p.append(t.post('This is a post #%s' % i, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(1, t.post( - 'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(2, t.post( - 'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(4, t.post( - 'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(6, t.post( - 'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(7, t.post( - 'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts)) - - ts += timedelta(minutes=1) - p.insert(8, t.post( - 'This is reply #0 to reply #1 to post #2', - parent_id=p[7]._id, timestamp=ts)) - - # with default paging limit - for _p in p: - url = t.url() + '?limit=25#' + _p.slug - assert _p.url_paginated() == url, _p.url_paginated() - - # with user paging limit - limit = 3 - c.user.set_pref('results_per_page', limit) - for i, _p in enumerate(p): - page = i // limit - url = t.url() + '?limit=%s' % limit - if page > 0: - url += '&page=%s' % page - url += '#' + _p.slug - assert _p.url_paginated() == url - - -@with_setup(setup_method) -def test_post_url_paginated_with_artifact(): - """Post.url_paginated should return link to attached artifact, if any""" - from forgewiki.model import Page - page = Page.upsert(title='Test Page') - thread = page.discussion_thread - comment = thread.post('Comment') - url = page.url() + '?limit=25#' + comment.slug - assert comment.url_paginated() == url - - -@with_setup(setup_method) -def test_post_notify(): - d = M.Discussion(shortname='test', name='test') - d.monitoring_email = '[email protected]' - t = M.Thread.new(discussion_id=d._id, subject='Test Thread') - with patch('allura.model.notification.Notification.send_simple') as send: - t.post('This is a post') - send.assert_called_with(d.monitoring_email) + d.update_stats() + ThreadLocalORMSession.flush_all() + assert d.last_post is None + assert d.url().endswith('wiki/_discuss/') + assert d.index()['name_s'] == 'test' + assert d.find_posts().count() == 0 + jsn = d.__json__() + assert jsn['name'] == d.name + d.delete() + ThreadLocalORMSession.flush_all() + ThreadLocalORMSession.close_all() + + def test_thread_methods(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + assert t.discussion_class() == M.Discussion + assert t.post_class() == M.Post + assert t.attachment_class() == M.DiscussionAttachment + p0 = t.post('This is a post') + p1 = t.post('This is another post') + time.sleep(0.25) + t.post('This is a reply', parent_id=p0._id) + ThreadLocalORMSession.flush_all() + ThreadLocalORMSession.close_all() + d = M.Discussion.query.get(shortname='test') + t = d.threads[0] + assert d.last_post is not None + assert t.last_post is not None + t.create_post_threads(t.posts) + posts0 = t.find_posts(page=0, limit=10, style='threaded') + posts1 = t.find_posts(page=0, limit=10, style='timestamp') + assert posts0 != posts1 + ts = p0.timestamp.replace( + microsecond=int(p0.timestamp.microsecond // 1000) * 1000) + posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts) + assert len(posts2) > 0 + + assert 'wiki/_discuss/' in t.url() + assert t.index()['views_i'] == 0 + assert t.post_count == 3 + jsn = t.__json__() + assert '_id' in jsn + assert len(jsn['posts']) == 3 + (p.approve() for p in (p0, p1)) + ThreadLocalORMSession.flush_all() + assert t.num_replies == 3 + t.spam() + assert t.num_replies == 0 + ThreadLocalORMSession.flush_all() + assert len(t.find_posts()) == 0 + t.delete() + + def test_thread_new(self): + with mock.patch('allura.model.discuss.h.nonce') as nonce: + nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead'] + d = M.Discussion(shortname='test', name='test') + t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One') + t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two') + ThreadLocalORMSession.flush_all() + session(t1).expunge(t1) + session(t2).expunge(t2) + t1_2 = M.Thread.query.get(_id=t1._id) + t2_2 = M.Thread.query.get(_id=t2._id) + assert t1._id == 'deadbeef' + assert t2._id == 'beefdead' + assert t1_2.subject == 'Test Thread One' + assert t2_2.subject == 'Test Thread Two' + + def test_post_methods(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p2 = t.post('This is another post') + assert p.discussion_class() == M.Discussion + assert p.thread_class() == M.Thread + assert p.attachment_class() == M.DiscussionAttachment + p.commit() + assert p.parent is None + assert p.subject == 'Test Thread' + assert p.attachments == [] + assert 'wiki/_discuss' in p.url() + assert p.reply_subject() == 'Re: Test Thread' + assert p.link_text() == p.subject + + ss = p.history().first() + assert 'version' in h.get_first(ss.index(), 'title') + assert '#' in ss.shorthand_id() + + jsn = p.__json__() + assert jsn["thread_id"] == t._id + + (p.approve() for p in (p, p2)) + ThreadLocalORMSession.flush_all() + assert t.num_replies == 2 + p.spam() + assert t.num_replies == 1 + p.undo('ok') + assert t.num_replies == 2 + p.delete() + assert t.num_replies == 1 + + def test_attachment_methods(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p_att = p.attach('foo.text', BytesIO(b'Hello, world!'), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + t_att = p.attach('foo2.text', BytesIO(b'Hello, thread!'), + discussion_id=d._id, + thread_id=t._id) + d_att = p.attach('foo3.text', BytesIO(b'Hello, discussion!'), + discussion_id=d._id) + + ThreadLocalORMSession.flush_all() + assert p_att.post == p + assert p_att.thread == t + assert p_att.discussion == d + for att in (p_att, t_att, d_att): + assert 'wiki/_discuss' in att.url() + assert 'attachment/' in att.url() + + # Test notification in mail + t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') + fs = FieldStorage() + fs.name = 'file_info' + fs.filename = 'fake.txt' + fs.type = 'text/plain' + fs.file = BytesIO(b'this is the content of the fake file\n') + p = t.post(text='test message', forum=None, subject='', file_info=fs) + ThreadLocalORMSession.flush_all() + n = M.Notification.query.get( + subject='[test:wiki] Test comment notification') + url = h.absurl(f'{p.url()}attachment/{fs.filename}') + assert ( + '\nAttachments:\n\n' + '- [fake.txt]({}) (37 Bytes; text/plain)'.format(url) in + n.text) + + def test_multiple_attachments(self): + test_file1 = FieldStorage() + test_file1.name = 'file_info' + test_file1.filename = 'test1.txt' + test_file1.type = 'text/plain' + test_file1.file = BytesIO(b'test file1\n') + test_file2 = FieldStorage() + test_file2.name = 'file_info' + test_file2.filename = 'test2.txt' + test_file2.type = 'text/plain' + test_file2.file = BytesIO(b'test file2\n') + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + test_post = t.post('test post') + test_post.add_multiple_attachments([test_file1, test_file2]) + ThreadLocalORMSession.flush_all() + assert len(test_post.attachments) == 2 + attaches = test_post.attachments + assert 'test1.txt' in [attaches[0].filename, attaches[1].filename] + assert 'test2.txt' in [attaches[0].filename, attaches[1].filename] + + def test_add_attachment(self): + test_file = FieldStorage() + test_file.name = 'file_info' + test_file.filename = 'test.txt' + test_file.type = 'text/plain' + test_file.file = BytesIO(b'test file\n') + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + test_post = t.post('test post') + test_post.add_attachment(test_file) + ThreadLocalORMSession.flush_all() + assert len(test_post.attachments) == 1 + attach = test_post.attachments[0] + assert attach.filename == 'test.txt', attach.filename + assert attach.content_type == 'text/plain', attach.content_type + + def test_notification_two_attaches(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test comment notification') + fs1 = FieldStorage() + fs1.name = 'file_info' + fs1.filename = 'fake.txt' + fs1.type = 'text/plain' + fs1.file = BytesIO(b'this is the content of the fake file\n') + fs2 = FieldStorage() + fs2.name = 'file_info' + fs2.filename = 'fake2.txt' + fs2.type = 'text/plain' + fs2.file = BytesIO(b'this is the content of the fake file\n') + p = t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2]) + ThreadLocalORMSession.flush_all() + n = M.Notification.query.get( + subject='[test:wiki] Test comment notification') + base_url = h.absurl(f'{p.url()}attachment/') + assert ( + '\nAttachments:\n\n' + '- [fake.txt]({0}fake.txt) (37 Bytes; text/plain)\n' + '- [fake2.txt]({0}fake2.txt) (37 Bytes; text/plain)'.format(base_url) in + n.text) + + def test_discussion_delete(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p.attach('foo.text', BytesIO(b''), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + M.ArtifactReference.from_artifact(d) + rid = d.index_id() + ThreadLocalORMSession.flush_all() + d.delete() + ThreadLocalORMSession.flush_all() + assert M.ArtifactReference.query.find(dict(_id=rid)).count() == 0 + + def test_thread_delete(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p.attach('foo.text', BytesIO(b''), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + ThreadLocalORMSession.flush_all() + t.delete() + + def test_post_delete(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + p.attach('foo.text', BytesIO(b''), + discussion_id=d._id, + thread_id=t._id, + post_id=p._id) + ThreadLocalORMSession.flush_all() + p.delete() + + def test_post_undo(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + p = t.post('This is a post') + t.post('This is a post2') + t.post('This is a post3') + ThreadLocalORMSession.flush_all() + assert t.num_replies == 3 + p.spam() + assert t.num_replies == 2 + p.undo('ok') + assert t.num_replies == 3 - c.app.config.project.notifications_disabled = True - with patch('allura.model.notification.Notification.send_simple') as send: - t.post('Another post') + def test_post_permission_check(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + c.user = M.User.anonymous() try: + t.post('This post will fail the check.') + assert False, "Expected an anonymous post to fail." + except exc.HTTPUnauthorized: + pass + t.post('This post will pass the check.', ignore_security=True) + + def test_post_url_paginated(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + p = [] # posts in display order + ts = datetime.utcnow() - timedelta(days=1) + for i in range(5): + ts += timedelta(minutes=1) + p.append(t.post('This is a post #%s' % i, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(1, t.post( + 'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(2, t.post( + 'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(4, t.post( + 'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(6, t.post( + 'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(7, t.post( + 'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts)) + + ts += timedelta(minutes=1) + p.insert(8, t.post( + 'This is reply #0 to reply #1 to post #2', + parent_id=p[7]._id, timestamp=ts)) + + # with default paging limit + for _p in p: + url = t.url() + '?limit=25#' + _p.slug + assert _p.url_paginated() == url, _p.url_paginated() + + # with user paging limit + limit = 3 + c.user.set_pref('results_per_page', limit) + for i, _p in enumerate(p): + page = i // limit + url = t.url() + '?limit=%s' % limit + if page > 0: + url += '&page=%s' % page + url += '#' + _p.slug + assert _p.url_paginated() == url + + def test_post_url_paginated_with_artifact(self): + """Post.url_paginated should return link to attached artifact, if any""" + from forgewiki.model import Page + page = Page.upsert(title='Test Page') + thread = page.discussion_thread + comment = thread.post('Comment') + url = page.url() + '?limit=25#' + comment.slug + assert comment.url_paginated() == url + + def test_post_notify(self): + d = M.Discussion(shortname='test', name='test') + d.monitoring_email = '[email protected]' + t = M.Thread.new(discussion_id=d._id, subject='Test Thread') + with patch('allura.model.notification.Notification.send_simple') as send: + t.post('This is a post') send.assert_called_with(d.monitoring_email) - except AssertionError: - pass # method not called as expected - else: - assert False, 'send_simple must not be called' - - -@with_setup(setup_method) -@patch('allura.model.discuss.c.project.users_with_role') -def test_is_spam_for_admin(users): - users.return_value = [c.user, ] - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - t.post('This is a post') - post = M.Post.query.get(text='This is a post') - assert not t.is_spam(post), t.is_spam(post) - - -@with_setup(setup_method) -@patch('allura.model.discuss.c.project.users_with_role') -def test_is_spam(role): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role.return_value = [] - with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker: + + c.app.config.project.notifications_disabled = True + with patch('allura.model.notification.Notification.send_simple') as send: + t.post('Another post') + try: + send.assert_called_with(d.monitoring_email) + except AssertionError: + pass # method not called as expected + else: + assert False, 'send_simple must not be called' + + @patch('allura.model.discuss.c.project.users_with_role') + def test_is_spam_for_admin(self, users): + users.return_value = [c.user, ] + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + t.post('This is a post') + post = M.Post.query.get(text='This is a post') + assert not t.is_spam(post), t.is_spam(post) + + @patch('allura.model.discuss.c.project.users_with_role') + def test_is_spam(self, role): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role.return_value = [] + with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker: + spam_checker.check.return_value = True + post = mock.Mock() + assert t.is_spam(post), t.is_spam(post) + assert spam_checker.check.call_count == 1, spam_checker.call_count + + @mock.patch('allura.controllers.discuss.g.spam_checker') + def test_not_spam_and_has_unmoderated_post_permission(self, spam_checker): + spam_checker.check.return_value = False + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role = M.ProjectRole.by_name('*anonymous')._id + post_permission = M.ACE.allow(role, 'post') + unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') + t.acl.append(post_permission) + t.acl.append(unmoderated_post_permission) + with h.push_config(c, user=M.User.anonymous()): + post = t.post('Hey') + assert post.status == 'ok' + + @mock.patch('allura.controllers.discuss.g.spam_checker') + @mock.patch.object(M.Thread, 'notify_moderators') + def test_not_spam_but_has_no_unmoderated_post_permission(self, notify_moderators, spam_checker): + spam_checker.check.return_value = False + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role = M.ProjectRole.by_name('*anonymous')._id + post_permission = M.ACE.allow(role, 'post') + t.acl.append(post_permission) + with h.push_config(c, user=M.User.anonymous()): + post = t.post('Hey') + assert post.status == 'pending' + assert notify_moderators.call_count == 1 + + @mock.patch('allura.controllers.discuss.g.spam_checker') + @mock.patch.object(M.Thread, 'notify_moderators') + def test_spam_and_has_unmoderated_post_permission(self, notify_moderators, spam_checker): spam_checker.check.return_value = True - post = mock.Mock() - assert t.is_spam(post), t.is_spam(post) - assert spam_checker.check.call_count == 1, spam_checker.call_count - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') -def test_not_spam_and_has_unmoderated_post_permission(spam_checker): - spam_checker.check.return_value = False - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role = M.ProjectRole.by_name('*anonymous')._id - post_permission = M.ACE.allow(role, 'post') - unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') - t.acl.append(post_permission) - t.acl.append(unmoderated_post_permission) - with h.push_config(c, user=M.User.anonymous()): - post = t.post('Hey') - assert post.status == 'ok' - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') [email protected](M.Thread, 'notify_moderators') -def test_not_spam_but_has_no_unmoderated_post_permission(notify_moderators, spam_checker): - spam_checker.check.return_value = False - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role = M.ProjectRole.by_name('*anonymous')._id - post_permission = M.ACE.allow(role, 'post') - t.acl.append(post_permission) - with h.push_config(c, user=M.User.anonymous()): - post = t.post('Hey') - assert post.status == 'pending' - assert notify_moderators.call_count == 1 - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') [email protected](M.Thread, 'notify_moderators') -def test_spam_and_has_unmoderated_post_permission(notify_moderators, spam_checker): - spam_checker.check.return_value = True - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - role = M.ProjectRole.by_name('*anonymous')._id - post_permission = M.ACE.allow(role, 'post') - unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') - t.acl.append(post_permission) - t.acl.append(unmoderated_post_permission) - with h.push_config(c, user=M.User.anonymous()): - post = t.post('Hey') - assert post.status == 'pending' - assert notify_moderators.call_count == 1 - - -@with_setup(setup_method) [email protected]('allura.controllers.discuss.g.spam_checker') -def test_thread_subject_not_included_in_text_checked(spam_checker): - spam_checker.check.return_value = False - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - t.post('Hello') - assert spam_checker.check.call_count == 1 - assert spam_checker.check.call_args[0][0] == 'Hello' - - -def test_post_count(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - M.Post(discussion_id=d._id, thread_id=t._id, status='spam') - M.Post(discussion_id=d._id, thread_id=t._id, status='ok') - M.Post(discussion_id=d._id, thread_id=t._id, status='pending') - ThreadLocalORMSession.flush_all() - assert t.post_count == 2 - - [email protected]('allura.controllers.discuss.g.spam_checker') -def test_spam_num_replies(spam_checker): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2) - M.Post(discussion_id=d._id, thread_id=t._id, status='ok') - ThreadLocalORMSession.flush_all() - p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam') - p1.spam() - assert t.num_replies == 1 - - -def test_deleted_thread_index(): - d = M.Discussion(shortname='test', name='test') - t = M.Thread(discussion_id=d._id, subject='Test Thread') - p = M.Post(discussion_id=d._id, thread_id=t._id, status='ok') - t.delete() - ThreadLocalORMSession.flush_all() - - # re-query, so relationships get reloaded - ThreadLocalORMSession.close_all() - p = M.Post.query.get(_id=p._id) - - # just make sure this doesn't fail - p.index() \ No newline at end of file + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + role = M.ProjectRole.by_name('*anonymous')._id + post_permission = M.ACE.allow(role, 'post') + unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post') + t.acl.append(post_permission) + t.acl.append(unmoderated_post_permission) + with h.push_config(c, user=M.User.anonymous()): + post = t.post('Hey') + assert post.status == 'pending' + assert notify_moderators.call_count == 1 + + @mock.patch('allura.controllers.discuss.g.spam_checker') + def test_thread_subject_not_included_in_text_checked(self, spam_checker): + spam_checker.check.return_value = False + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + t.post('Hello') + assert spam_checker.check.call_count == 1 + assert spam_checker.check.call_args[0][0] == 'Hello' + + def test_post_count(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + M.Post(discussion_id=d._id, thread_id=t._id, status='spam') + M.Post(discussion_id=d._id, thread_id=t._id, status='ok') + M.Post(discussion_id=d._id, thread_id=t._id, status='pending') + ThreadLocalORMSession.flush_all() + assert t.post_count == 2 + + @mock.patch('allura.controllers.discuss.g.spam_checker') + def test_spam_num_replies(self, spam_checker): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2) + M.Post(discussion_id=d._id, thread_id=t._id, status='ok') + ThreadLocalORMSession.flush_all() + p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam') + p1.spam() + assert t.num_replies == 1 + + def test_deleted_thread_index(self): + d = M.Discussion(shortname='test', name='test') + t = M.Thread(discussion_id=d._id, subject='Test Thread') + p = M.Post(discussion_id=d._id, thread_id=t._id, status='ok') + t.delete() + ThreadLocalORMSession.flush_all() + + # re-query, so relationships get reloaded + ThreadLocalORMSession.close_all() + p = M.Post.query.get(_id=p._id) + + # just make sure this doesn't fail + p.index()
