Hi,

I'm attaching a patch with automated tests for stageuser plugin (https://fedorahosted.org/freeipa/ticket/3813). The user plugin test is affected as well (one class was added). The tests seem a bit of a mess even to myself, but what with the way freeipa behaves I didn't know how else to implement them, but I'm eager to learn how to do it in a nicer way, if someone has a better idea.


Lenka


>From c7b677696ed56649debac01090fe40389bb1d372 Mon Sep 17 00:00:00 2001
From: Lenka Ryznarova <lenka.ryznar...@gmail.com>
Date: Mon, 27 Jul 2015 14:17:58 +0200
Subject: [PATCH] Automated test for stageuser plugin

Ticket: https://fedorahosted.org/freeipa/ticket/3813
Test plan: http://www.freeipa.org/page/V4/User_Life-Cycle_Management/Test_Plan
---
 ipatests/test_xmlrpc/test_stageuser_plugin.py | 753 ++++++++++++++++++++++++++
 ipatests/test_xmlrpc/test_user_plugin.py      | 255 ++++++++-
 2 files changed, 1002 insertions(+), 6 deletions(-)
 create mode 100644 ipatests/test_xmlrpc/test_stageuser_plugin.py

diff --git a/ipatests/test_xmlrpc/test_stageuser_plugin.py b/ipatests/test_xmlrpc/test_stageuser_plugin.py
new file mode 100644
index 0000000000000000000000000000000000000000..66b61d5c38807c6a771e0532f73b163b67b902f5
--- /dev/null
+++ b/ipatests/test_xmlrpc/test_stageuser_plugin.py
@@ -0,0 +1,753 @@
+# Authors:
+#   ldoudova <ldoud...@redhat.com>
+#
+# Copyright (C) 2008, 2009  Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib/plugins/stageuser.py` module.
+"""
+
+
+import datetime
+import ldap
+import re
+import functools
+import pytest
+
+from ipalib import api, errors
+
+from ipatests.test_xmlrpc.ldaptracker import Tracker
+from ipatests.test_xmlrpc import objectclasses
+from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test, fuzzy_digits, fuzzy_uuid,
+                         fuzzy_password, fuzzy_string, fuzzy_dergeneralizedtime,
+                         add_sid, add_oc, raises_exact)
+
+from ipatests.util import assert_equal, assert_deepequal, assert_not_equal, raises
+from ipapython.dn import DN
+from ipatests.test_xmlrpc.test_user_plugin import UserTracker, get_user_dn
+
+user1 = u'tuser1'
+user2 = u'tuser2'
+renameduser1 = u'ruser'
+group1 = u'group1'
+admins_group = u'admins'
+
+uid = u'123'
+gid = u'456'
+invalidrealm1 = u'sus...@notfound.org'
+invalidrealm2 = u'suser1@b...@notfound.org'
+
+invaliduser1 = u'+tuser1'
+invaliduser2 = u'tuser1234567890123456789012345678901234567890'
+
+sshpubkey = (u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6X'
+              'HBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGI'
+              'wA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2'
+              'No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNm'
+              'cSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM01'
+              '9Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF'
+              '0L public key test')
+sshpubkeyfp = (u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B '
+                'public key test (ssh-rsa)')
+
+options_ok = [
+    {u'cn': u'name'},
+    {u'initials': u'in'},
+    {u'displayname': u'display'},
+    {u'homedirectory': u'/home/homedir'},
+    {u'gecos': u'gecos'},
+    {u'loginshell': u'/bin/shell'},
+    {u'mail': u'email@email.email'},
+    {u'title': u'newbie'},
+    {u'krbprincipalname': u'kerberos@%s' % api.env.realm},
+    {u'krbprincipalname': u'KERBEROS@%s' % api.env.realm},
+    {u'street': u'first street'},
+    {u'l': u'prague'},
+    {u'st': u'czech'},
+    {u'postalcode': u'12345'},
+    {u'telephonenumber': u'123456789'},
+    {u'facsimiletelephonenumber': u'123456789'},
+    {u'mobile': u'123456789'},
+    {u'pager': u'123456789'},
+    {u'ou': u'engineering'},
+    {u'carlicense': u'abc1234'},
+    {u'ipasshpubkey': sshpubkey},
+    {u'manager': u'auser1'},
+    {u'uidnumber': uid},
+    {u'gidnumber': gid},
+    {u'uidnumber': uid, u'gidnumber': gid},
+    {u'userpassword': u'Secret123'},
+    {u'random': True},
+    ]
+
+class StageUserTracker(Tracker):
+    """ Tracker class for staged user LDAP object 
+
+        Implements helper functions for host plugin.
+        StageUserTracker object stores information about the user. 
+    """
+
+    retrieve_keys = {u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
+        u'uidnumber', u'gidnumber', u'mail', u'ou',
+        u'telephonenumber', u'title', u'memberof', u'nsaccountlock',
+        u'memberofindirect', u'ipauserauthtype', u'userclass',
+        u'ipatokenradiusconfiglink', u'ipatokenradiususername',
+        u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab', u'has_password', 
+        u'street', u'postalcode', u'facsimiletelephonenumber', u'carlicense', 
+        u'ipasshpubkey', u'sshpubkeyfp', u'l', u'st', u'mobile', u'pager',}
+    retrieve_all_keys = retrieve_keys | {
+        u'cn', u'ipauniqueid', u'objectclass', u'description', 
+        u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+    
+    create_keys = retrieve_keys | {u'objectclass', u'ipauniqueid',
+                                   u'randompassword', u'userpassword'}
+    create_all_keys = retrieve_all_keys | {u'objectclass', u'ipauniqueid',
+                                   u'randompassword', u'userpassword', u'krbextradata',
+                                   u'krblastpwdchange', u'krbpasswordexpiration', u'krbprincipalkey'}
+    
+    update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
+    activate_keys = retrieve_keys | {u'has_keytab', u'has_password', u'nsaccountlock'}
+
+    def __init__(self, name, givenname, sn, **kwargs):
+        super(StageUserTracker, self).__init__(default_version=None)
+        self.uid = name
+        self.givenname = givenname
+        self.sn = sn
+        self.dn = DN(('uid', self.uid), api.env.container_stageuser, api.env.basedn)
+
+        self.kwargs = kwargs
+
+    def make_create_command(self, options=None, force=None, from_delete=False):
+        """ Make function that creates a staged user using stageuser-add """
+        if options != None:
+            self.kwargs = options
+        return self.make_command('stageuser_add', self.uid, 
+                                 givenname=self.givenname,
+                                 sn=self.sn, from_delete=from_delete, **self.kwargs)
+
+    def make_delete_command(self):
+        """ Make function that deletes a staged user using stageuser-del """
+        return self.make_command('stageuser_del', self.uid)
+
+    def make_retrieve_command(self, all=False, raw=False):
+        """ Make function that retrieves a staged user using stageuser-show """
+        return self.make_command('stageuser_show', self.uid, all=all)
+
+    def make_find_command(self, *args, **kwargs):
+        """ Make function that finds staged user using stageuser-find """
+        return self.make_command('stageuser_find', *args, **kwargs)
+
+    def make_update_command(self, updates):
+        """ Make function that updates staged user using stageuser-mod """
+        return self.make_command('stageuser_mod', self.uid, **updates)
+
+    def make_activate_command(self):
+        """ Make function that activates staged user using stageuser-activate """
+        return self.make_command('stageuser_activate', self.uid)
+
+    def track_create(self):
+        """ Update expected state for staged user creation """
+        self.attrs = dict(
+            dn=self.dn,
+            uid=[self.uid],
+            givenname=[self.givenname],
+            sn=[self.sn],
+            homedirectory=[u'/home/%s' % self.uid],
+            displayname=[u'%s %s' % (self.givenname, self.sn)],
+            cn=[u'%s %s' % (self.givenname, self.sn)],
+            initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+            objectclass=objectclasses.user_base,
+            description=[u'__no_upg__'],
+            ipauniqueid=[u'autogenerate'],
+            uidnumber=[u'-1'],
+            gidnumber=[u'-1'],
+            krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+            mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+            gecos=[u'%s %s' % (self.givenname, self.sn)],
+            loginshell=[u'/bin/sh'],
+            has_keytab=False,
+            has_password=False,
+            nsaccountlock=[u'true'],
+            )
+
+        for key in self.kwargs:
+            if key == u'krbprincipalname':
+                self.attrs[key] = [u'%s@%s' % ((self.kwargs[key].split('@'))[0].lower(), (self.kwargs[key].split('@'))[1])]
+            elif key == u'manager':
+                self.attrs[key] = [unicode(get_user_dn(self.kwargs[key]))]
+            elif key == u'ipasshpubkey':
+                self.attrs[u'sshpubkeyfp'] = [sshpubkeyfp]
+                self.attrs[key] = [self.kwargs[key]]
+            elif key == u'random' or key == u'userpassword':
+                self.attrs[u'krbextradata'] = [fuzzy_string]
+                self.attrs[u'krbpasswordexpiration'] = [fuzzy_dergeneralizedtime]
+                self.attrs[u'krblastpwdchange'] = [fuzzy_dergeneralizedtime]
+                self.attrs[u'krbprincipalkey'] = [fuzzy_string]
+                self.attrs[u'userpassword'] = [fuzzy_string]
+                self.attrs[u'has_keytab'] = True
+                self.attrs[u'has_password'] = True
+                if key == 'random':
+                    self.attrs[u'randompassword'] = fuzzy_string
+            else:
+                self.attrs[key] = [self.kwargs[key]]
+        
+        self.exists = True
+
+    def check_create(self, result):
+        """ Check 'stageuser-add' command result """
+        assert_deepequal(dict(
+            value=self.uid,
+            summary=u'Added stage user "%s"' % self.uid,
+            result=self.filter_attrs(self.create_all_keys),
+        ), result)
+
+    def check_delete(self, result):
+        """ Check 'stageuser-del' command result """
+        assert_deepequal(dict(
+            value=[self.uid],
+            summary=u'Deleted stage user "%s"' % self.uid,
+            result=dict(failed=[]),
+            ), result)
+    
+    def check_retrieve(self, result, all=False, raw=False):
+        """ Check 'stageuser-show' command result """
+        if all:
+            expected = self.filter_attrs(self.retrieve_all_keys)
+        else:
+            expected = self.filter_attrs(self.retrieve_keys)
+        
+        # small override because stageuser-find returns different type of nsaccountlock value than DS, but overall the value fits expected result
+        if expected[u'nsaccountlock'] == [u'true']:     
+            expected[u'nsaccountlock'] = True
+        elif expected[u'nsaccountlock'] == [u'false']:
+            expected[u'nsaccountlock'] = False
+        
+        assert_deepequal(dict(
+            value=self.uid,
+            summary=None,
+            result=expected,
+        ), result)
+
+    def check_find(self, result, all=False, raw=False):
+        """ Check 'stageuser-find' command result """
+        if all:
+            expected = self.filter_attrs(self.retrieve_all_keys)
+        else:
+            expected = self.filter_attrs(self.retrieve_keys)
+
+        # small override because stageuser-find returns different type of nsaccountlock value than DS, but overall the value fits expected result
+        if expected[u'nsaccountlock'] == [u'true']:     
+            expected[u'nsaccountlock'] = True
+        elif expected[u'nsaccountlock'] == [u'false']:
+            expected[u'nsaccountlock'] = False
+
+        assert_deepequal(dict(
+            count=1,
+            truncated=False,
+            summary=u'1 user matched',
+            result=[expected],
+        ), result)
+
+    def check_find_nomatch(self, result):
+        """ Check 'stageuser-find' command result """
+        assert_deepequal(dict(
+            count=0,
+            truncated=False,
+            summary=u'0 users matched',
+            result=[],
+        ), result)
+
+    def check_update(self, result, extra_keys=()):
+        """ Check 'stageuser-mod' command result """
+        assert_deepequal(dict(
+            value=self.uid,
+            summary=u'Modified stage user "%s"' % self.uid,
+            result=self.filter_attrs(self.update_keys | set(extra_keys))
+        ), result)
+
+    def make_fixture_activate(self, request):
+        """Make a pytest fixture for a staged user that is to be activated
+
+        The fixture ensures the plugin entry does not exist before
+        and after the tests that use it. It takes into account that the staged user 
+        no longer exists after activation, therefore the fixture verifies after the tests
+        that the staged user doesn't exist instead of deleting it.
+        """
+        del_command = self.make_delete_command()
+        try:
+            del_command()
+        except errors.NotFound:
+            pass
+
+        def finish():
+            with raises_exact(errors.NotFound(reason=u'%s: stage user not found' % self.uid)):
+                del_command()
+
+        request.addfinalizer(finish)
+
+        return self
+
+    def create_from_preserved(self, user):
+        """ Copies values from preserved user """
+        self.attrs = user.attrs
+        self.uid = user.uid
+        self.givenname = user.givenname
+        self.sn = user.sn
+
+@pytest.fixture(scope='class')
+def stageduser(request):
+    tracker = StageUserTracker(name=u'suser1', givenname=u'staged', sn=u'user')
+    return tracker.make_fixture(request)
+
+@pytest.fixture(scope='class', params=options_ok)
+def stageduser2(request):
+    tracker = StageUserTracker(u'suser2', u'staged', u'user', **request.param)
+    return tracker.make_fixture_activate(request)
+
+@pytest.fixture(scope='class')
+def stageduser3(request):
+    tracker = StageUserTracker(name=u'suser3', givenname=u'staged', sn=u'user')
+    return tracker.make_fixture_activate(request)
+
+@pytest.fixture(scope='class')
+def stageduser4(request):
+    tracker = StageUserTracker(u'tuser', u'test', u'user')
+    return tracker.make_fixture(request)
+
+@pytest.fixture(scope='class')
+def user(request):
+    tracker = UserTracker(u'auser1', u'active', u'user')
+    return tracker.make_fixture(request)
+
+@pytest.fixture(scope='class')
+def user2(request):
+    tracker = UserTracker(u'suser3', u'staged', u'user')
+    return tracker.make_fixture(request)
+
+@pytest.fixture(scope='class')
+def user3(request):
+    tracker = UserTracker(u'auser2', u'active', u'user')
+    return tracker.make_fixture(request)
+
+@pytest.fixture(scope='class')
+def user4(request):
+    tracker = UserTracker(u'tuser', u'test', u'user')
+    return tracker.make_fixture(request)
+
+@pytest.fixture(scope='class')
+def user5(request):
+    tracker = UserTracker(u'tuser', u'test', u'user')
+    return tracker.make_fixture(request)
+
+@pytest.fixture(scope='class')
+def user6(request):
+    tracker = UserTracker(u'suser2', u'staged', u'user')
+    return tracker.make_fixture(request)
+
+class TestNonexistentStagedUser(XMLRPC_test):
+    def test_retrieve_nonexistent(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_retrieve_command()
+        with raises_exact(errors.NotFound(
+                reason=u'%s: stage user not found' % stageduser.uid)):
+            command()
+    
+    def test_delete_nonexistent(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_delete_command()
+        with raises_exact(errors.NotFound(
+                reason=u'%s: stage user not found' % stageduser.uid)):
+            command()
+
+    def test_update_nonexistent(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_update_command(updates=dict(givenname=u'changed'))
+        with raises_exact(errors.NotFound(
+                reason=u'%s: stage user not found' % stageduser.uid)):
+            command()
+    
+    def test_find_nonexistent(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_find_command(uid=stageduser.uid)
+        result = command()
+        stageduser.check_find_nomatch(result)
+
+class TestStagedUser(XMLRPC_test):
+    def test_create_duplicate(self, stageduser):
+        stageduser.ensure_exists()
+        command = stageduser.make_create_command()
+        with raises_exact(errors.DuplicateEntry(
+            message=u'stage user with name "%s" already exists' % stageduser.uid)):
+            command()
+
+    def test_activate(self, stageduser3, user2):
+        stageduser3.ensure_exists()
+        user2.ensure_missing()
+        user2 = UserTracker(stageduser3.uid, stageduser3.givenname, stageduser3.sn)
+        user2.create_from_staged(stageduser3)
+        command = stageduser3.make_activate_command()
+        result = command()
+        user2.check_activate(result)
+        
+        command = stageduser3.make_retrieve_command()
+        with raises_exact(errors.NotFound(
+            reason=u'%s: stage user not found' % stageduser3.uid)):
+            command()
+        user2.delete()
+    
+    def test_show_stageduser(self, stageduser):
+        stageduser.retrieve()
+    
+    def test_showall_stageduser(self, stageduser):
+        stageduser.retrieve(all=True)
+
+    def test_create_attr(self, stageduser2, user, user6):
+        # create staged user with specified parameters
+        user.ensure_exists() # necessary for manager test, do not delete
+        stageduser2.ensure_missing()
+        command = stageduser2.make_create_command()
+        result = command()
+        stageduser2.track_create()
+        stageduser2.check_create(result)    
+
+        # activate user, verify that specified values were preserved after activation
+        user6.ensure_missing()
+        user6 = UserTracker(stageduser2.uid, stageduser2.givenname, stageduser2.sn, **stageduser2.kwargs)
+        user6.create_from_staged(stageduser2)
+        command = stageduser2.make_activate_command()
+        result = command()
+        user6.check_activate(result)
+
+        # verify the staged user does not exist after activation        
+        command = stageduser2.make_retrieve_command()
+        with raises_exact(errors.NotFound(
+            reason=u'%s: stage user not found' % stageduser2.uid)):
+            command()
+
+        user6.delete()
+    
+    def test_delete_stageduser(self, stageduser):
+        stageduser.delete()
+    
+    def test_find_stageduser(self, stageduser):
+        stageduser.find()
+
+    def test_findall_stageduser(self, stageduser):
+        stageduser.find(all=True)
+
+    def test_update_stageduser(self, stageduser):
+        stageduser.update(dict(givenname=u'changed',),
+                            expected_updates=dict(givenname=[u'changed'],))
+        stageduser.retrieve()
+    
+    def test_update_uid(self, stageduser):
+        stageduser.update(dict(uidnumber=uid), expected_updates=dict(uidnumber=[uid]))
+        stageduser.retrieve()
+
+    def test_update_gid(self, stageduser):
+        stageduser.update(dict(uidnumber=gid), 
+                            expected_updates=dict(uidnumber=[gid]))
+        stageduser.retrieve()
+
+    def test_update_uid_gid(self, stageduser):
+        stageduser.update(dict(uidnumber=uid, gidnumber=gid), 
+                            expected_updates=dict(uidnumber=[uid], gidnumber=[gid]))
+        stageduser.retrieve()
+
+class TestCreateInvalidAttributes(XMLRPC_test):
+    def test_create_invalid_uid(self):
+        invalid = StageUserTracker(invaliduser1, u'invalid', u'user')
+        command = invalid.make_create_command()
+        with raises_exact(errors.ValidationError(
+            name='login',
+            error=u"may only include letters, numbers, _, -, . and $")):
+            command()
+
+    def test_create_long_uid(self):
+        invalid = StageUserTracker(invaliduser2, u'invalid', u'user')
+        command = invalid.make_create_command()
+        with raises_exact(errors.ValidationError(
+            name='login',
+            error=u"can be at most 32 characters")):
+            command()
+        
+    def test_create_uid_string(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_create_command(options={u'uidnumber': u'text'})
+        with raises_exact(errors.ConversionError(
+            message=u'invalid \'uid\': must be an integer')):
+            command()
+
+    def test_create_gid_string(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_create_command(options={u'gidnumber': u'text'})
+        with raises_exact(errors.ConversionError(
+            message=u'invalid \'gidnumber\': must be an integer')):
+            command()
+    
+    def test_create_uid_negative(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_create_command(options={u'uidnumber': u'-123'})
+        with raises_exact(errors.ValidationError(
+            message=u'invalid \'uid\': must be at least 1')):
+            command()
+
+    def test_create_gid_negative(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_create_command(options={u'gidnumber': u'-123'})
+        with raises_exact(errors.ValidationError(
+            message=u'invalid \'gidnumber\': must be at least 1')):
+            command()
+
+    def test_create_krbprincipal_bad_realm(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_create_command(options={u'krbprincipalname': invalidrealm1})
+        with raises_exact(errors.RealmMismatch(
+            message=u'The realm for the principal does not match the realm for this IPA server')):
+            command()
+
+    def test_create_krbprincipal_malformed(self, stageduser):
+        stageduser.ensure_missing()
+        command = stageduser.make_create_command(options={u'krbprincipalname': invalidrealm2})
+        with raises_exact(errors.MalformedUserPrincipal(
+            message=u'Principal is not of the form user@REALM: \'%s\'' % invalidrealm2)):
+            command()
+
+class TestUpdateInvalidAttributes(XMLRPC_test):
+    def test_update_uid_string(self, stageduser):
+        stageduser.ensure_exists()
+        command = stageduser.make_update_command(updates={u'uidnumber': u'text'})
+        with raises_exact(errors.ConversionError(
+            message=u'invalid \'uid\': must be an integer')):
+            command()
+
+    def test_update_gid_string(self, stageduser):
+        stageduser.ensure_exists()
+        command = stageduser.make_update_command(updates={u'gidnumber': u'text'})
+        with raises_exact(errors.ConversionError(
+            message=u'invalid \'gidnumber\': must be an integer')):
+            command()
+
+    def test_update_uid_negative(self, stageduser):
+        stageduser.ensure_exists()
+        command = stageduser.make_update_command(updates={u'uidnumber': u'-123'})
+        with raises_exact(errors.ValidationError(
+            message=u'invalid \'uid\': must be at least 1')):
+            command()
+
+    def test_update_gid_negative(self, stageduser):
+        stageduser.ensure_exists()
+        command = stageduser.make_update_command(updates={u'gidnumber': u'-123'})
+        with raises_exact(errors.ValidationError(
+            message=u'invalid \'gidnumber\': must be at least 1')):
+            command()
+
+class TestActive(XMLRPC_test):
+    def test_delete(self, user):
+        user.ensure_exists()
+        user.track_delete()
+        command = user.make_delete_command()
+        result = command()
+        user.check_delete(result)
+
+    def test_delete_nopreserve(self, user):
+        user.ensure_exists()
+        user.track_delete()
+        command = user.make_delete_command(no_preserve=True)
+        result = command()
+        user.check_delete(result)
+
+    def test_delete_preserve_nopreserve(self, user):
+        user.ensure_exists() 
+        command = user.make_delete_command(no_preserve=True, preserve=True)
+        with raises_exact(errors.MutuallyExclusiveError(
+            message=u'preserve and no-preserve cannot be both set')):
+            command()
+
+    def test_delete_preserve(self, user):
+        user.ensure_exists()
+        user.track_delete()
+        command = user.make_delete_command(no_preserve=False, preserve=True)
+        result = command()
+        user.check_delete(result)
+        
+        command = user.make_delete_command()
+        result = command()
+        user.check_delete(result)
+        
+        command = user.make_retrieve_command()
+        with raises_exact(errors.NotFound(
+            reason=u'%s: user not found' % user.uid)):
+            command()
+
+class TestPreserved(XMLRPC_test):
+    def test_search_preserved1(self, user):
+        user.ensure_exists()
+        command = user.make_delete_command(no_preserve=False, preserve=True)
+        result = command()
+        user.check_delete(result)
+
+        command = user.make_find_command(uid=user.uid)
+        result = command()
+        user.check_find_nomatch(result)
+
+    def test_search_preserved2(self, user):
+        user.ensure_exists()
+        user.make_delete_command(no_preserve=False, preserve=True)
+
+        command = user.make_find_command(uid=user.uid, preserved=True, all=False)
+        result = command()
+        user.check_find(result, all=False)
+
+    def test_search_preserved2_all(self, user):
+        user.ensure_exists()
+        user.make_delete_command(no_preserve=False, preserve=True)
+
+        command = user.make_find_command(uid=user.uid, preserved=True, all=True)
+        result = command()
+        user.check_find(result, all=True)
+
+    def test_retrieve_preserved(self, user):
+        user.ensure_exists()
+        user.make_delete_command(no_preserve=False, preserve=True)
+        
+        command = user.make_retrieve_command()
+        result = command()
+        user.check_retrieve(result)
+
+    def test_reactivate_preserved(self, user):
+        user.ensure_exists()
+        user.make_delete_command(no_preserve=False, preserve=True)
+        
+        command = user.make_undelete_command()
+        result = command()
+        user.check_undel(result)
+    
+    def test_staged_from_preserved(self, user, stageduser):
+        user.ensure_exists()
+        user.make_delete_command(no_preserve=False, preserve=True)
+
+        stageduser.ensure_missing()
+        stageduser = StageUserTracker(user.uid, user.givenname, user.sn)
+        stageduser.create_from_preserved(user)
+        command = user.make_create_command(from_delete=True)
+        result = command()
+        stageduser.check_create(result)
+
+        command = user.make_retrieve_command()
+        with raises_exact(errors.NotFound(
+            reason=u'%s: user not found' % stageduser.uid)):
+            command()
+        stageduser.delete()
+    
+    def test_delete_preserved(self, user):
+        user.delete_preserve()
+        user.delete()
+
+    def test_enable_preserved(self, user):
+        user.delete_preserve()
+        command = user.make_enable_command()
+        with raises_exact(errors.MidairCollision(
+            message=u'change collided with another change')):
+            command()
+        user.delete()
+
+class TestManagers(XMLRPC_test):
+    def test_staged_manager(self, user, stageduser):
+        user.ensure_exists()
+        stageduser.ensure_exists()
+
+        command = user.make_update_command(updates=dict(manager=stageduser.uid))
+        with raises_exact(errors.NotFound(
+            reason=u'manager %s not found' % stageduser.uid)):
+            command()
+        user.delete()
+        stageduser.delete()
+
+    def test_preserved_manager(self, user, user3):
+        user.ensure_exists()
+        user3.delete_preserve()
+
+        command = user.make_update_command(updates=dict(manager=user3.uid))
+        with raises_exact(errors.NotFound(
+            reason=u'manager %s not found' % user3.uid)):
+            command()
+
+        user3.delete()
+    
+    def test_delete_manager_preserved(self, user, user3):
+        user3.ensure_exists()
+
+        user.update(updates=dict(manager=user3.uid), 
+            expected_updates=dict(manager=[user3.uid], nsaccountlock=False))
+
+        user3.delete_preserve()
+        del user.attrs[u'manager']
+
+        command = user.make_retrieve_command(all=True)
+        result = command()
+        user.check_retrieve(result, all=True)
+        
+        # verify whether user has a manager attribute
+        if u'manager' in result['result']:
+            assert False
+    
+class TestDuplicates(XMLRPC_test):
+    def test_active_same_as_preserved(self, user4, user5):
+        user4.ensure_missing()
+        user5.delete_preserve()
+        command = user4.make_create_command()
+        with raises_exact(errors.DuplicateEntry(message=u'user with name "%s" already exists' % user4.uid)):
+            command()
+        user5.delete()
+    
+    def test_staged_same_as_active(self, user4, stageduser4):
+        user4.ensure_exists()
+        stageduser4.create() # can be created
+
+        command = stageduser4.make_activate_command()
+        with raises_exact(errors.DuplicateEntry(
+            message=u'active user with name "%s" already exists' % user4.uid)):
+            command() # cannot be activated
+
+        user4.delete()
+        stageduser4.delete()
+    
+    def test_staged_same_as_preserved(self, user5, stageduser4):
+        user5.delete_preserve()
+        stageduser4.create() # can be created
+        
+        command = stageduser4.make_activate_command()
+        with raises_exact(errors.DuplicateEntry(
+            message=u'This entry already exists')):
+            command() # cannot be activated
+
+        user5.delete()
+        stageduser4.delete()
+
+    def test_active_same_as_staged(self, user4, stageduser4):
+        user4.ensure_missing()
+        stageduser4.ensure_exists()
+        command = user4.make_create_command()
+        result = command()
+        user4.track_create()
+        user4.check_create(result) # can be created
+
+        command = stageduser4.make_activate_command()
+        with raises_exact(errors.DuplicateEntry(
+            message=u'active user with name "%s" already exists' % user4.uid)):
+            command() # cannot be activated
diff --git a/ipatests/test_xmlrpc/test_user_plugin.py b/ipatests/test_xmlrpc/test_user_plugin.py
index 1e226d0d8e180850adc8f6f544a18150571f71e3..ec2120328a5275e1fe8e875cfa78cc5917ce3ac0 100644
--- a/ipatests/test_xmlrpc/test_user_plugin.py
+++ b/ipatests/test_xmlrpc/test_user_plugin.py
@@ -26,14 +26,16 @@ Test the `ipalib/plugins/user.py` module.
 import datetime
 import ldap
 import re
+import functools
 
 from ipalib import api, errors
 from ipatests.test_xmlrpc import objectclasses
-from ipatests.util import assert_equal, assert_not_equal, raises
+from ipatests.util import assert_equal, assert_deepequal, assert_not_equal, raises
 from xmlrpc_test import (XMLRPC_test, Declarative, fuzzy_digits, fuzzy_uuid,
                          fuzzy_password, fuzzy_string, fuzzy_dergeneralizedtime,
                          add_sid, add_oc)
 from ipapython.dn import DN
+from ipatests.test_xmlrpc.ldaptracker import Tracker
 
 user1 = u'tuser1'
 user2 = u'tuser2'
@@ -301,7 +303,6 @@ class test_user(Declarative):
                             objectclasses.user,
                             u'ipantuserattrs'
                         ) + [u'ipauser'],
-                        preserved=False
                     ),
                 ],
                 summary=u'1 user matched',
@@ -1163,8 +1164,7 @@ class test_user(Declarative):
                     manager=[renameduser1],
                     objectclass=add_oc(objectclasses.user_base,
                                        u'ipantuserattrs'),
-                    preserved=False,
-                    omit=['mepmanagedentry'],
+                    omit=['mepmanagedentry']
                 ),
                 value=user2,
                 summary=None,
@@ -1193,8 +1193,7 @@ class test_user(Declarative):
                     memberof_group=[group1],
                     objectclass=add_oc(objectclasses.user_base,
                                        u'ipantuserattrs'),
-                    preserved=False,
-                    omit=['mepmanagedentry'],
+                    omit=['mepmanagedentry']
                 ),
                 value=user2,
                 summary=None,
@@ -1643,3 +1642,247 @@ class test_denied_bind_with_expired_principal(XMLRPC_test):
                 krbprincipalexpiration=principal_expiration_string)
 
         self.connection.simple_bind_s(str(get_user_dn(user1)), self.password)
+
+
+""" Class for host plugin like tests """
+class UserTracker(Tracker):
+    retrieve_keys = {u'uid', u'givenname', u'sn', u'homedirectory', u'loginshell',
+        u'uidnumber', u'gidnumber', u'mail', u'ou',
+        u'telephonenumber', u'title', u'memberof',
+        u'memberofindirect', u'ipauserauthtype', u'userclass',
+        u'ipatokenradiusconfiglink', u'ipatokenradiususername',
+        u'krbprincipalexpiration', u'usercertificate', u'dn', u'has_keytab', u'has_password', 
+        u'street', u'postalcode', u'facsimiletelephonenumber', u'carlicense', 
+        u'ipasshpubkey', u'sshpubkeyfp', u'nsaccountlock', u'preserved', u'memberof_group', 
+        u'l', u'mobile', u'krbextradata', u'krblastpwdchange', u'krbpasswordexpiration', 
+        u'pager', u'st'}
+
+    retrieve_all_keys = retrieve_keys | {
+        u'cn', u'ipauniqueid', u'objectclass', u'mepmanagedentry',
+        u'displayname', u'gecos', u'initials', u'krbprincipalname', u'manager'}
+
+    retrieve_preserved_keys = retrieve_keys - {u'memberof_group'}
+    retrieve_preserved_all_keys = retrieve_all_keys - {u'memberof_group'}
+
+    create_keys = retrieve_all_keys | {u'randompassword', u'mepmanagedentry',  
+        u'krbextradata', u'krbpasswordexpiration', u'krblastpwdchange', u'krbprincipalkey', 
+        u'randompassword', u'userpassword'}
+    update_keys = retrieve_keys - {u'dn'}
+    activate_keys = retrieve_all_keys - {u'has_keytab', u'has_password', u'nsaccountlock', u'sshpubkeyfp'}
+
+    find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
+    find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'}
+
+
+
+    def __init__(self, name, givenname, sn, **kwargs):
+        super(UserTracker, self).__init__(default_version=None)
+        self.uid = name
+        self.givenname = givenname
+        self.sn = sn
+        self.dn = DN(('uid', self.uid), api.env.container_user, api.env.basedn)
+
+        self.kwargs = kwargs
+
+    def make_create_command(self, force=None):
+        """ Make function that crates a user using user-add """
+        return self.make_command('user_add', self.uid, givenname=self.givenname, 
+                                 sn=self.sn, **self.kwargs)
+
+    def make_delete_command(self, no_preserve=True, preserve=False):
+        """ Make function that deletes a user using user-del """
+
+        if preserve and not no_preserve:
+            # necessary to change some user attributes due to moving to different container
+            self.attrs[u'dn'] = DN(('uid', self.uid), api.env.container_deleteuser, api.env.basedn)
+            self.attrs[u'objectclass'] = objectclasses.user_base
+        
+        return self.make_command('user_del', self.uid, no_preserve=no_preserve, preserve=preserve)
+
+    def make_retrieve_command(self, all=False, raw=False):
+        """ Make function that retrieves a user using user-show """
+        return self.make_command('user_show', self.uid, all=all)
+
+    def make_find_command(self, *args, **kwargs):
+        """ Make function that finds user using user-find """
+        return self.make_command('user_find', *args, **kwargs)
+
+    def make_update_command(self, updates):
+        """ Make function that updates user using user-mod """
+        return self.make_command('user_mod', self.uid, **updates)
+
+    def make_undelete_command(self):
+        """ Make function that activates preserved user using user-undel """
+        return self.make_command('user_undel', self.uid)
+
+    def make_enable_command(self):
+        """ Make function that enables user using user-enable """
+        return self.make_command('user_enable', self.uid)
+
+    def track_create(self):
+        """ Update expected state for user creation """
+        self.attrs = dict(
+            dn=self.dn,
+            uid=[self.uid],
+            givenname=[self.givenname],
+            sn=[self.sn],
+            homedirectory=[u'/home/%s' % self.uid],
+            displayname=[u'%s %s' % (self.givenname, self.sn)],
+            cn=[u'%s %s' % (self.givenname, self.sn)],
+            initials=[u'%s%s' % (self.givenname[0], self.sn[0])],
+            objectclass=objectclasses.user,
+            description=[u'__no_upg__'],
+            ipauniqueid=[fuzzy_uuid],
+            uidnumber=[fuzzy_digits],
+            gidnumber=[fuzzy_digits],
+            krbprincipalname=[u'%s@%s' % (self.uid, self.api.env.realm)],
+            mail=[u'%s@%s' % (self.uid, self.api.env.domain)],
+            gecos=[u'%s %s' % (self.givenname, self.sn)],
+            loginshell=[u'/bin/sh'],
+            has_keytab=False,
+            has_password=False,
+            mepmanagedentry=[get_group_dn(self.uid)],
+            memberof_group=[u'ipausers'],
+            )
+        
+        for key in self.kwargs:
+            if key == u'krbprincipalname':
+                self.attrs[key] = [u'%s@%s' % ((self.kwargs[key].split('@'))[0].lower(), (self.kwargs[key].split('@'))[1])]
+            else:
+                self.attrs[key] = [self.kwargs[key]]
+
+        self.exists = True
+
+    def check_create(self, result):
+        """ Check 'user-add' command result """
+        assert_deepequal(dict(
+            value=self.uid,
+            summary=u'Added user "%s"' % self.uid,
+            result=self.filter_attrs(self.create_keys),
+            ), result)
+
+    def check_delete(self, result):
+        """ Check 'user-del' command result """
+        assert_deepequal(dict(
+            value=[self.uid],
+            summary=u'Deleted user "%s"' % self.uid,
+            result=dict(failed=[]),
+            ), result)
+
+    def check_retrieve(self, result, all=False):
+        """ Check 'user-show' command result """
+
+        if u'preserved' in self.attrs and self.attrs[u'preserved']:
+            self.retrieve_all_keys = self.retrieve_preserved_all_keys
+            self.retrieve_keys = self.retrieve_preserved_keys
+        elif u'preserved' not in self.attrs and all:
+            self.attrs[u'preserved'] = False
+
+        if all:
+            expected = self.filter_attrs(self.retrieve_all_keys)
+        else:
+            expected = self.filter_attrs(self.retrieve_keys)
+
+        # small override because stageuser-find returns different type of nsaccountlock value than DS, but overall the value fits expected result
+        if u'nsaccountlock' in expected:
+            if expected[u'nsaccountlock'] == [u'true']:     
+                expected[u'nsaccountlock'] = True
+            elif expected[u'nsaccountlock'] == [u'false']:
+                expected[u'nsaccountlock'] = False
+
+        assert_deepequal(dict(
+            value=self.uid,
+            summary=None,
+            result=expected,
+        ), result)
+
+    def check_find(self, result, all=False, raw=False):
+        """ Check 'user-find' command result """
+        self.attrs[u'nsaccountlock'] = True
+        self.attrs[u'preserved'] = True
+
+        if all:
+            expected = self.filter_attrs(self.find_all_keys)
+        else:
+            expected = self.filter_attrs(self.find_keys)
+
+        assert_deepequal(dict(
+            count=1,
+            truncated=False,
+            summary=u'1 user matched',
+            result=[expected],
+        ), result)
+
+    def check_find_nomatch(self, result):
+        """ Check 'user-find' command result when no user should be found """
+        assert_deepequal(dict(
+            count=0,
+            truncated=False,
+            summary=u'0 users matched',
+            result=[],
+        ), result)
+
+    def check_update(self, result, extra_keys=()):
+        """ Check 'user-mod' command result """
+        assert_deepequal(dict(
+            value=self.uid,
+            summary=u'Modified user "%s"' % self.uid,
+            result=self.filter_attrs(self.update_keys | set(extra_keys))
+        ), result)
+
+    def create_from_staged(self, stageduser):
+        self.attrs = stageduser.attrs
+        self.uid = stageduser.uid
+        self.givenname = stageduser.givenname
+        self.sn = stageduser.sn
+        
+        self.attrs[u'mepmanagedentry'] = None
+        self.attrs[u'dn'] = self.dn
+        self.attrs[u'ipauniqueid'] = [fuzzy_uuid]
+        self.attrs[u'memberof'] = [u'cn=ipausers,%s,%s' % (api.env.container_group, api.env.basedn)]
+        self.attrs[u'mepmanagedentry'] = [u'cn=%s,%s,%s' % (self.uid, api.env.container_group, api.env.basedn)]
+        self.attrs[u'objectclass'] = objectclasses.user
+        if self.attrs[u'gidnumber'] == [u'-1']:
+            self.attrs[u'gidnumber'] = [fuzzy_digits]
+        if self.attrs[u'uidnumber'] == [u'-1']:
+            self.attrs[u'uidnumber'] = [fuzzy_digits]
+
+        if u'ipasshpubkey' in self.kwargs:
+                self.attrs[u'ipasshpubkey'] = [str(self.kwargs[u'ipasshpubkey'])]
+
+    def check_activate(self, result):
+        """ Check 'stageuser-activate' command result """
+        expected = dict(
+            value=self.uid,
+            summary=u'Stage user %s activated' % self.uid,
+            result=self.filter_attrs(self.activate_keys))
+
+        # work around to eliminate inconsistency in returned objectclass (case sensitive assertion)
+        expected['result']['objectclass'] = [item.lower() for item in expected['result']['objectclass']]
+        result['result']['objectclass'] = [item.lower() for item in result['result']['objectclass']]
+
+        assert_deepequal(expected, result)
+
+        self.exists = True
+
+    def check_undel(self, result):
+        assert_deepequal(dict(
+            value=self.uid,
+            summary=u'Undeleted user account "%s"' % self.uid,
+            result=True
+            ), result)
+
+    def track_delete(self, preserve=False):
+        """Update expected state for host deletion"""
+        if preserve:
+            self.exists = True
+        else:
+            self.exists = False
+        self.attrs = {}
+    
+    def delete_preserve(self):
+        self.ensure_exists()
+        self.track_delete(preserve=True)
+        command = self.make_delete_command(no_preserve=False, preserve=True)
+        result = command()
+        self.check_delete(result)
\ No newline at end of file
-- 
1.9.3



-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to