Hi,

I updated the (stage)user tests to reflect the multiple managers per user feature.
Corresponding ticket: https://fedorahosted.org/freeipa/ticket/5344

Lenka
From 03b4673debf019562d00d0c0f4cfcf3f295fa612 Mon Sep 17 00:00:00 2001
From: Lenka Doudova <ldoud...@redhat.com>
Date: Mon, 14 Dec 2015 06:51:47 +0100
Subject: [PATCH] Tests: Multiple managers per user

Adding Tracker methods and new tests to test multiple managers per (stage)user feature.
https://fedorahosted.org/freeipa/ticket/5344
---
 ipatests/test_xmlrpc/test_stageuser_plugin.py    | 127 +++++++++++++++++++++++
 ipatests/test_xmlrpc/test_user_plugin.py         |  96 +++++++++++++++++
 ipatests/test_xmlrpc/tracker/stageuser_plugin.py |  97 +++++++++++++++++
 ipatests/test_xmlrpc/tracker/user_plugin.py      | 117 ++++++++++++++++++++-
 4 files changed, 434 insertions(+), 3 deletions(-)

diff --git a/ipatests/test_xmlrpc/test_stageuser_plugin.py b/ipatests/test_xmlrpc/test_stageuser_plugin.py
index 42ecf046884369bd74b03194937c425215b99f6c..e597acf0d32a2cf707d69eced655d5e34cb3a3be 100644
--- a/ipatests/test_xmlrpc/test_stageuser_plugin.py
+++ b/ipatests/test_xmlrpc/test_stageuser_plugin.py
@@ -673,3 +673,130 @@ class TestGroups(XMLRPC_test):
         command = group.make_add_member_command(options={u'user': user.uid})
         result = command()
         group.check_add_member_negative(result)
+
+
+@pytest.fixture(scope='class')
+def stageduser_with_manager(request):
+    tracker = StageUserTracker(u'tuser1', u'test', u'user')
+    return tracker.make_fixture_activate(request)
+
+
+@pytest.fixture(scope='class')
+def manager1(request):
+    tracker = UserTracker(u'manager1', u'test', u'manager')
+    return tracker.make_fixture(request)
+
+
+@pytest.fixture(scope='class')
+def manager2(request):
+    tracker = UserTracker(u'manager2', u'test', u'manager')
+    return tracker.make_fixture(request)
+
+
+@pytest.fixture(scope='class')
+def user_activated_with_manager(request):
+    tracker = UserTracker(u'tuser1', u'test', u'user')
+    return tracker.make_fixture(request)
+
+
+@pytest.mark.tier1
+class TestMultipleManagers(XMLRPC_test):
+    def test_add_manager(
+            self, stageduser_with_manager, manager1, manager2,
+            user_activated_with_manager):
+        stageduser_with_manager.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+        stageduser_with_manager.add_manager(
+            managers=[manager1.uid, manager2.uid])
+
+        # verify managers are preserved after activation
+        user_activated_with_manager.activate(stageduser_with_manager)
+        user_activated_with_manager.delete()
+
+    def test_add_duplicate_manager(self, stageduser, manager1):
+        stageduser.ensure_exists()
+        manager1.ensure_exists()
+        stageduser.add_manager(
+            managers=[manager1.uid, manager1.uid],
+            expected_fail={manager1.uid: u'already_manager'})
+        stageduser.delete()
+
+    def test_add_nonexistent_manager(self, stageduser, manager1):
+        stageduser.ensure_exists()
+        manager1.ensure_missing()
+        stageduser.add_manager(
+            managers=[manager1.uid],
+            expected_fail={manager1.uid: 'nonexistent'})
+        stageduser.delete()
+
+    def test_remove_manager(self, stageduser, manager1, manager2):
+        stageduser.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+        stageduser.add_manager(managers=[manager1.uid, manager2.uid])
+        stageduser.remove_manager(managers=[manager1.uid])
+        stageduser.delete()
+
+    def test_remove_nonmanager(self, stageduser, manager1):
+        stageduser.ensure_exists()
+        manager1.ensure_exists()
+        stageduser.remove_manager(
+            managers=[manager1.uid],
+            expected_fail={manager1.uid: u'not_manager'})
+        stageduser.delete()
+
+    def test_remove_nonexistent(self, stageduser, manager1):
+        stageduser.ensure_exists()
+        manager1.ensure_missing()
+        stageduser.remove_manager(
+            managers=[manager1.uid],
+            expected_fail=[manager1.uid])
+        stageduser.delete()
+
+    def test_show_multiple_managers(self, stageduser, manager1, manager2):
+        """ Verifies that multiple managers are listed properly
+        on 'stageuser-show' command """
+        stageduser.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+        stageduser.add_manager(managers=[manager1.uid, manager2.uid])
+
+        command = stageduser.make_retrieve_command()
+        result = command()
+        stageduser.check_retrieve(result)
+        stageduser.delete()
+
+    def test_delete_all_managers(self, stageduser, manager1, manager2):
+        """ Verifies that all managers assigned to a user are removed
+        when original 'stageuser-mod' command is used with empty
+        --manager option """
+        stageduser.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+        stageduser.add_manager(managers=[manager1.uid, manager2.uid])
+
+        updates = {u'manager': ""}
+        expected_updates = {u'manager': ""}
+        command = stageduser.make_update_command(updates)
+        result = command()
+        stageduser.attrs.update(updates)
+        stageduser.attrs.update(expected_updates)
+        del stageduser.attrs[u'manager']
+        stageduser.check_update(result, extra_keys=set(updates.keys()) |
+                                set(expected_updates.keys()))
+
+        stageduser.retrieve()
+
+    def test_delete_manager(self, stageduser, manager1, manager2):
+        """ Verifies that a manager is removed from staged user,
+        when deleted """
+        stageduser.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+
+        stageduser.add_manager(managers=[manager1.uid, manager2.uid])
+        manager1.delete()
+        stageduser.attrs[u'manager'].remove(manager1.uid)
+        stageduser.retrieve(all=True)
+        stageduser.delete()
diff --git a/ipatests/test_xmlrpc/test_user_plugin.py b/ipatests/test_xmlrpc/test_user_plugin.py
index 084fb83c42d362204ff4547357226c8f56d217fb..655ac9e1a48469f23962bb28d30ebc3bb9f73ad0 100644
--- a/ipatests/test_xmlrpc/test_user_plugin.py
+++ b/ipatests/test_xmlrpc/test_user_plugin.py
@@ -35,6 +35,7 @@ from xmlrpc_test import (
     XMLRPC_test, Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_password,
     fuzzy_string, fuzzy_dergeneralizedtime, add_sid, add_oc)
 from ipapython.dn import DN
+from ipatests.test_xmlrpc.tracker.user_plugin import UserTracker
 import pytest
 
 user1 = u'tuser1'
@@ -1652,3 +1653,98 @@ class test_denied_bind_with_expired_principal(XMLRPC_test):
                 krbprincipalexpiration=principal_expiration_string)
 
         self.connection.simple_bind_s(str(get_user_dn(user1)), self.password)
+
+
+@pytest.fixture(scope='class')
+def user(request):
+    tracker = UserTracker(u'tuser1', u'test', u'user')
+    return tracker.make_fixture(request)
+
+
+@pytest.fixture(scope='class')
+def manager1(request):
+    tracker = UserTracker(u'manager1', u'test', u'manager')
+    return tracker.make_fixture(request)
+
+
+@pytest.fixture(scope='class')
+def manager2(request):
+    tracker = UserTracker(u'manager2', u'test', u'manager')
+    return tracker.make_fixture(request)
+
+
+@pytest.mark.tier1
+class TestMultipleManagers(XMLRPC_test):
+    def test_add_manager(
+            self, user, manager1, manager2):
+        user.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+        user.add_manager(
+            managers=[manager1.uid, manager2.uid])
+        user.delete()
+
+    def test_add_duplicate_manager(self, user, manager1):
+        user.ensure_exists()
+        manager1.ensure_exists()
+        user.add_manager(
+            managers=[manager1.uid, manager1.uid],
+            expected_fail={manager1.uid: u'already_manager'})
+        user.delete()
+
+    def test_add_nonexistent_manager(self, user, manager1):
+        user.ensure_exists()
+        manager1.ensure_missing()
+        user.add_manager(
+            managers=[manager1.uid],
+            expected_fail={manager1.uid: 'nonexistent'})
+        user.delete()
+
+    def test_remove_manager(self, user, manager1, manager2):
+        user.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+        user.add_manager(managers=[manager1.uid, manager2.uid])
+        user.remove_manager(managers=[manager1.uid])
+        user.delete()
+
+    def test_remove_nonmanager(self, user, manager1):
+        user.ensure_exists()
+        manager1.ensure_exists()
+        user.remove_manager(
+            managers=[manager1.uid],
+            expected_fail={manager1.uid: u'not_manager'})
+        user.delete()
+
+    def test_remove_nonexistent(self, user, manager1):
+        user.ensure_exists()
+        manager1.ensure_missing()
+        user.remove_manager(
+            managers=[manager1.uid],
+            expected_fail=[manager1.uid])
+        user.delete()
+
+    def test_show_multiple_managers(self, user, manager1, manager2):
+        """ Verifies that multiple managers are listed properly
+        on 'user-show' command """
+        user.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+        user.add_manager(managers=[manager1.uid, manager2.uid])
+
+        command = user.make_retrieve_command()
+        result = command()
+        user.check_retrieve(result)
+        user.delete()
+
+    def test_delete_manager(self, user, manager1, manager2):
+        """ Verifies that a manager is removed from user, when deleted """
+        user.ensure_exists()
+        manager1.ensure_exists()
+        manager2.ensure_exists()
+
+        user.add_manager(managers=[manager1.uid, manager2.uid])
+        manager1.delete()
+        user.attrs[u'manager'].remove(manager1.uid)
+        user.retrieve(all=True)
+        user.delete()
diff --git a/ipatests/test_xmlrpc/tracker/stageuser_plugin.py b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
index 0f7eadd040d5fd12958a0651ddf00cbc4055fa1f..88cb619cf940c2ab3b1b51b94205a9fce2b2f6fc 100644
--- a/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
+++ b/ipatests/test_xmlrpc/tracker/stageuser_plugin.py
@@ -57,6 +57,8 @@ class StageUserTracker(Tracker):
     update_keys = retrieve_keys - {u'dn', u'nsaccountlock'}
     activate_keys = retrieve_keys | {
         u'has_keytab', u'has_password', u'nsaccountlock'}
+    manager_keys = (retrieve_keys | {u'manager'}) - {
+        u'has_password', u'has_keytab'}
 
     def __init__(self, name, givenname, sn, **kwargs):
         super(StageUserTracker, self).__init__(default_version=None)
@@ -97,6 +99,15 @@ class StageUserTracker(Tracker):
             using stageuser-activate """
         return self.make_command('stageuser_activate', self.uid)
 
+    def make_add_manager_command(self, managers):
+        """ Make function that adds a manager to a staged user """
+        return self.make_command('stageuser_add_manager', self.uid, **managers)
+
+    def make_remove_manager_command(self, managers):
+        """ Make function that removes a manager from a staged user """
+        return self.make_command(
+            'stageuser_remove_manager', self.uid, **managers)
+
     def track_create(self):
         """ Update expected state for staged user creation """
         self.attrs = dict(
@@ -231,6 +242,92 @@ class StageUserTracker(Tracker):
             result=dict(failed=[]),
         ), result)
 
+    def track_add_manager(self, managers, expected_fail={}):
+        """ Update expected state of staged user after adding a manager """
+        if u'manager' in self.attrs:
+            [self.attrs[u'manager'].append(item) for item in managers]
+        else:
+            self.attrs[u'manager'] = managers
+        self.attrs[u'managers_added'] = len(managers) - len(expected_fail)
+        self.attrs[u'managers_failed'] = {}
+        for key in expected_fail:
+            if expected_fail[key] == u'already_manager':
+                self.attrs[u'managers_failed'][key] =\
+                    u'This entry is already a member'
+            elif expected_fail[key] == u'nonexistent':
+                self.attrs[u'managers_failed'][key] = u'no such entry'
+            self.attrs[u'manager'].remove(key)
+        if self.attrs[u'manager'] == []:
+            del self.attrs[u'manager']
+
+    def check_add_manager(self, result):
+        """ Check 'stageuser-add-manager' command result """
+        completed = self.attrs[u'managers_added']
+        failed = {u'manager': {u'user': []}}
+        if len(self.attrs[u'managers_failed']) != 0:
+            for item in self.attrs[u'managers_failed']:
+                failed[u'manager'][u'user'].append(
+                    [item, self.attrs[u'managers_failed'][item]])
+
+        assert_deepequal(dict(
+            failed=failed,
+            completed=completed,
+            result=self.filter_attrs(self.manager_keys)
+        ), result)
+
+    def add_manager(self, managers, expected_fail={}):
+        """ Complete procedure of performing 'stageuser-add-manager'
+        command and verifying its results """
+        command = self.make_add_manager_command(managers={u'user': managers})
+        result = command()
+        fails = {}
+        self.track_add_manager(
+            managers=managers,
+            expected_fail=expected_fail)
+        self.check_add_manager(result)
+
+    def track_remove_manager(self, managers, expected_fail=[]):
+        """ Update expected state of staged user after removing a manager """
+        if u'manager' in self.attrs:
+            for item in managers:
+                if item not in expected_fail:
+                    self.attrs[u'manager'].remove(item)
+
+        self.attrs[u'managers_removed'] = len(managers) - len(expected_fail)
+        self.attrs[u'managers_failed'] = {}
+        for item in expected_fail:
+            self.attrs[u'managers_failed'][item] =\
+                u'This entry is not a member'
+        if u'manager' in self.attrs and self.attrs[u'manager'] == []:
+            del self.attrs[u'manager']
+
+    def check_remove_manager(self, result):
+        """ Check 'stageuser-remove-manager' command result """
+        completed = self.attrs[u'managers_removed']
+        failed = {u'manager': {u'user': []}}
+        if len(self.attrs[u'managers_failed']) != 0:
+            for item in self.attrs[u'managers_failed']:
+                failed[u'manager'][u'user'].append(
+                    [item, self.attrs[u'managers_failed'][item]])
+
+        assert_deepequal(dict(
+            failed=failed,
+            completed=completed,
+            result=self.filter_attrs(self.manager_keys)
+        ), result)
+
+    def remove_manager(self, managers, expected_fail=[]):
+        """ Complete procedure of performing 'stageuser-remove-manager'
+        command and verifying its results """
+        command = self.make_remove_manager_command(
+            managers={u'user': managers})
+        result = command()
+        fails = {}
+        self.track_remove_manager(
+            managers=managers,
+            expected_fail=expected_fail)
+        self.check_remove_manager(result)
+
     def make_fixture_activate(self, request):
         """Make a pytest fixture for a staged user that is to be activated
 
diff --git a/ipatests/test_xmlrpc/tracker/user_plugin.py b/ipatests/test_xmlrpc/tracker/user_plugin.py
index bcae2ec787b3d3a54e34917f125be006094e07b0..2927b387b0f7164ae2d41d1f6c578040d9a0d48a 100644
--- a/ipatests/test_xmlrpc/tracker/user_plugin.py
+++ b/ipatests/test_xmlrpc/tracker/user_plugin.py
@@ -46,6 +46,8 @@ class UserTracker(Tracker):
 
     find_keys = retrieve_keys - {u'mepmanagedentry', u'memberof_group'}
     find_all_keys = retrieve_all_keys - {u'mepmanagedentry', u'memberof_group'}
+    manager_keys = (retrieve_keys | {u'manager'}) - {
+        u'has_password', u'has_keytab'}
 
     def __init__(self, name, givenname, sn, **kwargs):
         super(UserTracker, self).__init__(default_version=None)
@@ -108,6 +110,15 @@ class UserTracker(Tracker):
         staged container """
         return self.make_command('user_stage', self.uid)
 
+    def make_add_manager_command(self, managers):
+        """ Make function that adds a manager to a user """
+        return self.make_command('user_add_manager', self.uid, **managers)
+
+    def make_remove_manager_command(self, managers):
+        """ Make function that removes a manager from a user """
+        return self.make_command(
+            'user_remove_manager', self.uid, **managers)
+
     def track_create(self):
         """ Update expected state for user creation """
         self.attrs = dict(
@@ -161,7 +172,7 @@ class UserTracker(Tracker):
             result=dict(failed=[]),
             ), result)
 
-    def check_retrieve(self, result, all=False):
+    def check_retrieve(self, result, all=False, raw=False):
         """ Check 'user-show' command result """
 
         if u'preserved' in self.attrs and self.attrs[u'preserved']:
@@ -183,6 +194,8 @@ class UserTracker(Tracker):
                 expected[u'nsaccountlock'] = True
             elif expected[u'nsaccountlock'] == [u'false']:
                 expected[u'nsaccountlock'] = False
+        else:
+            expected[u'nsaccountlock'] = False
 
         assert_deepequal(dict(
             value=self.uid,
@@ -260,8 +273,8 @@ class UserTracker(Tracker):
             result=self.filter_attrs(self.activate_keys))
 
         if 'manager' in expected['result']:
-            expected['result']['manager'] = [
-                unicode(get_user_dn(expected['result']['manager'][0]))]
+            for i, item in enumerate(expected['result']['manager']):
+                expected['result'][u'manager'][i] = unicode(get_user_dn(item))
 
         # work around to eliminate inconsistency in returned objectclass
         # (case sensitive assertion)
@@ -343,3 +356,101 @@ class UserTracker(Tracker):
         request.addfinalizer(finish)
 
         return self
+
+    def activate(self, stageduser):
+        self.ensure_missing()
+        self.create_from_staged(stageduser)
+        command = stageduser.make_activate_command()
+        result = command()
+        self.check_activate(result)
+
+        command = stageduser.make_retrieve_command()
+        with raises_exact(errors.NotFound(
+                reason=u'%s: stage user not found' % stageduser.uid)):
+            command()
+
+    def track_add_manager(self, managers, expected_fail={}):
+        """ Update expected state of staged user after adding a manager """
+        if u'manager' in self.attrs:
+            [self.attrs[u'manager'].append(item) for item in managers]
+        else:
+            self.attrs[u'manager'] = managers
+        self.attrs[u'managers_added'] = len(managers) - len(expected_fail)
+        self.attrs[u'managers_failed'] = {}
+        for key in expected_fail:
+            if expected_fail[key] == u'already_manager':
+                self.attrs[u'managers_failed'][key] =\
+                    u'This entry is already a member'
+            elif expected_fail[key] == u'nonexistent':
+                self.attrs[u'managers_failed'][key] = u'no such entry'
+            self.attrs[u'manager'].remove(key)
+        if self.attrs[u'manager'] == []:
+            del self.attrs[u'manager']
+
+    def check_add_manager(self, result):
+        """ Check 'stageuser-add-manager' command result """
+        completed = self.attrs[u'managers_added']
+        failed = {u'manager': {u'user': []}}
+        if len(self.attrs[u'managers_failed']) != 0:
+            for item in self.attrs[u'managers_failed']:
+                failed[u'manager'][u'user'].append(
+                    [item, self.attrs[u'managers_failed'][item]])
+
+        assert_deepequal(dict(
+            failed=failed,
+            completed=completed,
+            result=self.filter_attrs(self.manager_keys)
+        ), result)
+
+    def add_manager(self, managers, expected_fail={}):
+        """ Complete procedure of performing 'stageuser-add-manager'
+        command and verifying its results """
+        command = self.make_add_manager_command(managers={u'user': managers})
+        result = command()
+        fails = {}
+        self.track_add_manager(
+            managers=managers,
+            expected_fail=expected_fail)
+        self.check_add_manager(result)
+
+    def track_remove_manager(self, managers, expected_fail=[]):
+        """ Update expected state of staged user after removing a manager """
+        if u'manager' in self.attrs:
+            for item in managers:
+                if item not in expected_fail:
+                    self.attrs[u'manager'].remove(item)
+
+        self.attrs[u'managers_removed'] = len(managers) - len(expected_fail)
+        self.attrs[u'managers_failed'] = {}
+        for item in expected_fail:
+            self.attrs[u'managers_failed'][item] =\
+                u'This entry is not a member'
+        if u'manager' in self.attrs and self.attrs[u'manager'] == []:
+            del self.attrs[u'manager']
+
+    def check_remove_manager(self, result):
+        """ Check 'stageuser-remove-manager' command result """
+        completed = self.attrs[u'managers_removed']
+        failed = {u'manager': {u'user': []}}
+        if len(self.attrs[u'managers_failed']) != 0:
+            for item in self.attrs[u'managers_failed']:
+                failed[u'manager'][u'user'].append(
+                    [item, self.attrs[u'managers_failed'][item]])
+
+        assert_deepequal(dict(
+            failed=failed,
+            completed=completed,
+            result=self.filter_attrs(self.manager_keys)
+        ), result)
+
+    def remove_manager(self, managers, expected_fail=[]):
+        """ Complete procedure of performing 'stageuser-remove-manager'
+        command and verifying its results """
+        command = self.make_remove_manager_command(
+            managers={u'user': managers})
+        result = command()
+        fails = {}
+        self.track_remove_manager(
+            managers=managers,
+            expected_fail=expected_fail)
+        self.check_remove_manager(result)
-- 
2.4.3

-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to