Patch 0146 implements lower-lever infrastructure for querying server roles/attributes

Patch 0147 are some basic tests slapped together for the `serverroles` backend to ensure that it works as expected.

The new/modified CLI commands specified in the design page [1] will be coming soon.

Also coming soon will be an interface to construct DNS records for each role requested by Petr^2 and Martin^2 which I will start implement when we agree on the backend implementation.

https://fedorahosted.org/freeipa/ticket/5181

[1] https://www.freeipa.org/page/V4/Server_Roles#CLI

--
Martin^3 Babinsky
From 1b9c4c0462bdef291c50e82f974441a56a3b5db8 Mon Sep 17 00:00:00 2001
From: Martin Babinsky <mbabi...@redhat.com>
Date: Thu, 19 May 2016 11:24:18 +0200
Subject: [PATCH 1/2] Server Roles: infrastructure for role/attribute
 definition and query

This patch implements basic infrastructure for querying and manipulating
server roles and server attributes. This consists of:

    * class hierarchy for definition of service/membership based roles and
      server attributes. The definition of these classes is briefly documented
      in the module docstring

    * `serverroles` backed which consumes the information provided by these
      classes and provides an API for actions such as displaying status of
      server roles on a master, listing all masters providing the
      role/attribute, setting/unsetting attribute on master, etc.

https://www.freeipa.org/page/V4/Server_Roles
https://fedorahosted.org/freeipa/ticket/5181
---
 ipalib/errors.py                 |  29 +-
 ipaserver/plugins/serverroles.py | 769 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 797 insertions(+), 1 deletion(-)
 create mode 100644 ipaserver/plugins/serverroles.py

diff --git a/ipalib/errors.py b/ipalib/errors.py
index 52fa25f02e02d1d71c012f32d761b64a838917be..01340cbc540c87fca72fc8925a4b98255ff04611 100644
--- a/ipalib/errors.py
+++ b/ipalib/errors.py
@@ -1384,6 +1384,33 @@ class InvalidDomainLevelError(ExecutionError):
     errno = 4032
     format = _('%(reason)s')
 
+
+class MasterAlreadyProvidesRole(ExecutionError):
+    """
+    **4033** Raised when setting role/attribute on a master that already
+             provides it.
+    """
+    errno = 4033
+    format = _('%(hostname)s is already %(role)s')
+
+
+class MasterDoesNotProvideRole(ExecutionError):
+    """
+    **4034** Raised when removing role/attribute from a master that does not
+             provide it.
+    """
+    errno = 4034
+    format = _('%(hostname)s is not %(role)s')
+
+
+class ReadOnlyServerAttribute(ExecutionError):
+    """
+    **4035** Raised when setting/unsetting read-only attribute on a server.
+    """
+    errno = 4035
+    format = _("%(attribute)s can not be set/unset using LDAP operations only")
+
+
 class BuiltinError(ExecutionError):
     """
     **4100** Base class for builtin execution errors (*4100 - 4199*).
@@ -1812,7 +1839,7 @@ class CertificateInvalidError(CertificateError):
     """
     **4310** Raised when a certificate is not valid
 
-    For example:
+    For example
 
     >>> raise CertificateInvalidError(name=_(u'CA'))
     Traceback (most recent call last):
diff --git a/ipaserver/plugins/serverroles.py b/ipaserver/plugins/serverroles.py
new file mode 100644
index 0000000000000000000000000000000000000000..86e6b626dde4ef3802b6c02d2f7c038f519d02fa
--- /dev/null
+++ b/ipaserver/plugins/serverroles.py
@@ -0,0 +1,769 @@
+#
+# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
+#
+
+
+"""
+This module contains the `serverroles` backend which serves as and API for
+retrieval (and sometimes manipulation) of functionality provided by masters
+present in FreeIPA topology. The backend itself leverages a set of classes
+which abstract various bits and pieces of information present in the LDAP tree
+about functionalities such as DNS server, Active Directory trust controller
+etc. These properties come in two distinct groups:
+
+    server roles
+        this group represents a genral functionality provided by one or more
+        IPA servers, such as DNS server, certificate authority and such. In
+        this case there is a many-to-many mapping between the roles and the
+        masters which provide them.
+
+    server attributes
+        these are currently associated only to one server among the topology
+        (singular attributes). For example, there is only one CA renewal
+        master and one DNSSec key master in the topology.
+
+See the corresponding design page (http://www.freeipa.org/page/V4/Server_Roles)
+for more info.
+
+Both of these groups use `LDAPBasedProperty` class as a base.
+
+Server Roles
+============
+
+Server roles come in two flavors. Those that are consuming information from the
+master's service container (cn=FQDN,cn=masters,cn=ipa,cn=etc,$SUFFIX) inherit
+from the `ServiceBasedRole` subclass. To create such role, you only need to
+specify role name and individual services comprising the role (more systemd
+services may be needed to provide some function):
+
+>>> class ExampleRole(ServiceBasedRole):
+...     name = "Example Role"
+...     component_services = {'SERVICE1', 'SERVICE2'}
+
+Roles may also be based on membership of the host or some of its services in
+some group. These should inherit from `MembershipBasedRole` and the process is
+a bit more involved, since you have to specify type of the object associated
+with the group (host or service), group DN and, optionally, override methods
+which convert FQDN to the object primary key and get FQDN back from the entry:
+
+>>> from ipapython.dn import DN
+>>> class AnotherRole(MembershipBasedRole)
+...     name = "Another role based on membership"
+...     member_obj = 'host'
+...
+...     @property
+...     def group_dn(self):
+...         return DN(('cn', 'some group'), self.api.env.container_hostgroups,
+...                   self.api.env.basedn)
+...
+...     def fqdn_from_entry(self, entry):
+...         return entry.single_value.get('fqdn')
+
+Both role types then provide `providers` property to list all masters on which
+the role is enabled and `status` method that probes role status on the
+master.
+
+There is a module-level registry `role_registry` which hosts roles available
+for use to the backend. Use `register_role` decorator to add your role to this
+registry:
+
+>>> @register_role
+>>> class YetAnotherRole(ServiceBasedRole):
+...     name = "Yet Another Role"
+...     component_services = {'YAS'}
+
+The registry keys roles by their lower-cased names.
+
+>>> 'yet another role' in role_registry
+True
+
+Server Attributes
+=================
+
+The current implementation of server attributes which inherit from
+`BaseServerAttribute` class assumes the the attribute is defined by some value
+of 'ipaConfigString' attribute on entry in master's service container. To
+create your own server attribute class, see the following example:
+
+>>> @register_attribute
+>>> class ExampleAttribute(BaseServerAttribute):
+...     name = "Example Attribute"
+...     singular = True
+...     read_only = False
+...     associated_role = ExampleRole
+...     associated_service_entry = 'SERVICE1'
+...     ipa_config_string_value = 'roleMaster'
+
+The list of all masters with this attribute set can be accessed via `providers`
+property (a one-element list in the case of singular ones). If the attribute is
+not read-only, you can use `set_provider` and `unset_provider` methods to
+add/remove masters providing the attribute. In the case of singular attribute,
+`set provider` will automatically unset the attribute on the previous master.
+
+NOTE: Both of these methods will raise an exception when used on read-only
+attribute
+
+The attributes have their own registry which is populated using
+`register_attribute` decorator:
+
+>>> 'example attribute' in attribute_registry
+True
+
+Using roles/attributes from the backend
+=======================================
+
+The `serverroles` backend has access to all roles and attributes stored in
+module-level registries. It uses `role_factory/attribute_factory` functions to
+populate its internal stores with instances of the role/attribute classes. The
+information contained in them can be accessed by the following methods:
+
+    *api.Backend.serverroles.find_roles(hostname, substring)*
+        search for roles containing `substring` on master `hostname` and
+        show their status (or nothing if no match was found)
+
+    *api.Backend.serverroles.get_role_status(hostname, role_names)*
+        show status of role(s) on the master
+
+    *api.Backend.serverroles.get_role_providers(role_names)*
+        get list of providing masters for one or more roles
+
+    *api.Backend.serverroles.get_provided_roles(hostname)*
+        get list of all roles enabled on the master
+
+    *api.Backend.serverroles.get_attribute_providers(attr_names)*
+        get list of all master which have the specified attribute names
+        set on them
+
+    *api.Backend.serverroles.set_attribute_provider(hostname, attr_name)*
+        set the attribute `attr_name` on master `hostname`
+
+Note that attribute/role names are searched/matched case-insensitively. Also
+note that the `serverroles` backend does not create/destroy any LDAP connection
+by itself, so make sure `ldap2` backend connections are taken care of
+in the calling code
+"""
+
+
+import abc
+from collections import namedtuple, defaultdict
+
+import six
+
+from ipalib import errors, _
+from ipalib.backend import Backend
+from ipalib.plugable import Registry
+from ipapython.dn import DN
+
+
+if six.PY3:
+    unicode = str
+
+
+ENABLED = u'enabled'
+CONFIGURED = u'configured'
+ABSENT = u'absent'
+
+
+role_registry = {}
+
+attribute_registry = {}
+
+
+def register_role(cls):
+    """
+    Class decorator which registers the server role class in the module level
+    registry
+    """
+    role_registry[cls.name.lower()] = cls
+    return cls
+
+
+def register_attribute(cls):
+    """
+    Class decorator which registers the server attribute class in the module
+    level registry
+    """
+    attribute_registry[cls.name.lower()] = cls
+    return cls
+
+
+def role_factory(api_instance):
+    """
+    return a dict of server role instances keyed by lower-cased role name
+    """
+
+    return {key: role_cls(api_instance) for key, role_cls in
+            role_registry.items()}
+
+
+def attribute_factory(api_instance):
+    """
+    return a dict of server attribute instances keyed by lower-cased attribute
+    name
+    """
+    return {key: attr_cls(api_instance) for key, attr_cls in
+            attribute_registry.items()}
+
+
+@six.add_metaclass(abc.ABCMeta)
+class LDAPBasedProperty(object):
+    """
+    base class for all master properties defined by LDAP content
+    :param name: user-friendly name of the property
+    :param attrs_list: list of attributes to retrieve during search, defaults
+        to all
+    """
+    name = None
+    attrs_list = ['*']
+
+    def __init__(self, api_instance):
+        self.api = api_instance
+        self.ldap_conn = self.api.Backend.ldap2
+
+    @property
+    def search_filter(self):
+        """
+        filter used in `search` for LDAP entry retrieval
+
+        :returns: search filter computed from other instance attributes
+            or default filter: (objectclass=*)
+        """
+        return "(objectclass=*)"
+
+    @abc.abstractproperty
+    def search_base(self):
+        return None
+
+    def search(self, search_base):
+        """
+        retrieve LDAP entries used for further processing
+        """
+        entries = self.ldap_conn.get_entries(
+            search_base,
+            filter=self.search_filter,
+            attrs_list=self.attrs_list
+        )
+        return entries
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseServerRole(LDAPBasedProperty):
+    """
+    Server role hierarchy apex. All other server role definition should either
+    inherit from it or at least provide the 'status' method and 'providers'
+    property
+    """
+
+    @abc.abstractmethod
+    def status(self, hostname):
+        """
+        status of the role on IPA master
+
+        :param hostname: server FQDN
+        :returns: * 'enabled' if the role is enabled on the master
+                  * 'configured' if it is not enabled but has
+                    been configured by installer
+                  * 'absent' otherwise
+        """
+        pass
+
+    @abc.abstractproperty
+    def providers(self):
+        """
+        :returns: list of all masters providing the role (i.e. role is enabled
+        on them)
+        """
+        pass
+
+
+class BaseServerAttribute(LDAPBasedProperty):
+    """
+    Server attribute hierarchy apex
+
+    :param singular: if True, the attribute can have at most one master as
+        provider
+    :param read_only: attribute providers can not be added/removed using LDAP
+        interface only
+    :param associated_role: `ServerRole` class of a role which must be enabled
+        on the provider
+    :param associated_service_name: name of LDAP service on which the
+        attribute is set. Does not need to belong to the service entries
+        of associate role
+    :param ipa_config_string_value: value of `ipaConfigString` attribute
+        associated with the presence of server attribute
+    """
+    singular = False
+    read_only = False
+
+    associated_role = None
+    associated_service_name = None
+    ipa_config_string_value = None
+
+    def __init__(self, api_instance):
+        super(BaseServerAttribute, self).__init__(api_instance)
+        # pylint: disable=not-callable
+        self.role_instance = self.associated_role(
+            api_instance)
+        # pylint: enable=not-callable
+
+    @property
+    def search_filter(self):
+        svc_filter = self.ldap_conn.make_filter_from_attr(
+            'cn', self.associated_service_name)
+
+        configstring_filter = self.ldap_conn.make_filter_from_attr(
+            'ipaConfigString', self.ipa_config_string_value)
+        return self.ldap_conn.combine_filters(
+            [svc_filter, configstring_filter], rules=self.ldap_conn.MATCH_ALL)
+
+    @property
+    def search_base(self):
+        return DN(self.api.env.container_masters, self.api.env.basedn)
+
+    @property
+    def providers(self):
+        """
+        :returns: list of masters providing the attribute. In the case of
+        singular attributes returns a list containing single object
+        """
+        try:
+            entries = self.search(self.search_base)
+        except errors.EmptyResult:
+            return set()
+
+        masters = set(e.dn[1]['cn'] for e in entries)
+
+        if not masters.issubset(self.role_instance.providers):
+            raise errors.ExecutionError(
+                message=_("%(attribute)s attribute set on master(s) with"
+                          "%(roles)s not enabled")
+            )
+
+        if self.singular and len(masters) != 1:
+            raise errors.SingleMatchExpected(found=len(entries))
+
+        return sorted(masters)
+
+    def _get_master_dn(self, hostname):
+        try:
+            return self.api.Object.server.get_dn_if_exists(hostname)
+        except errors.NotFound:
+            self.api.Object.server.handle_not_found(hostname)
+
+    def _get_masters_service_entry(self, master_dn):
+        service_dn = DN(('cn', self.associated_service_name), master_dn)
+        return self.ldap_conn.get_entry(service_dn)
+
+    def _update_masters_svc_entry(self, service_entry, action='add'):
+        ipa_conf_string = service_entry.get('ipaConfigString', [])
+        if action == 'add':
+            ipa_conf_string.append(self.ipa_config_string_value)
+        elif action == 'remove':
+            ipa_conf_string.remove(self.ipa_config_string_value)
+        else:
+            raise ValueError('Invalid action for master entry update')
+
+        self.ldap_conn.update_entry(service_entry)
+
+    def unset_provider(self, hostname):
+        """
+        unset the master as a provider of the attribute
+
+        :param hostname: master FQDN
+        :raises: errors.MasterDoesNotProvideRole if the server does not
+            actually provide the attribute
+        """
+        if self.read_only:
+            raise errors.ReadOnlyServerAttribute(attribute=self.name)
+
+        master_dn = self._get_master_dn(hostname)
+        service_entry = self._get_masters_service_entry(master_dn)
+
+        if hostname not in self.providers:
+            raise errors.MasterDoesNotProvideRole(
+                hostname=hostname, role=self.name)
+
+        self._update_masters_svc_entry(service_entry, action='remove')
+
+    def set_provider(self, hostname):
+        """
+        set the IPA master as the attribute provider
+        :param hostname: master FQDN.
+
+        NOTE: singular attribute will be automatically unset on the previous
+        master and set on the new one
+
+        :returns: dictionary containing lists of old and new providers
+        :raises: * errors.MasterAlreadyProvidesRole if the master already has
+                   the attribute set
+                 * errors.ExecutionError if trying to set a read-only attribute
+        """
+        if self.read_only:
+            raise errors.ReadOnlyServerAttribute(attribute=self.name)
+
+        result = {'old_providers': self.providers}
+
+        master_dn = self._get_master_dn(hostname)
+
+        if hostname in self.providers:
+            raise errors.MasterAlreadyProvidesRole(
+                hostname=hostname, role=self.name
+            )
+
+        if hostname not in self.role_instance.providers:
+            raise errors.MasterDoesNotProvideRole(
+                hostname=hostname, role=self.role_instance.name)
+
+        # in the case of singular attribute unset the original master first
+        if self.singular and self.providers:
+            self.unset_provider(self.providers[0])
+
+        # then set up the attribute on the new master
+        service_entry = self._get_masters_service_entry(master_dn)
+        self._update_masters_svc_entry(service_entry, action='add')
+
+        result['new_providers'] = self.providers
+        return result
+
+
+_Service = namedtuple('Service', ['name', 'enabled'])
+
+
+class ServiceBasedRole(BaseServerRole):
+    """
+    base class for all roles whose status is defined by presence of one or
+    more entries in LDAP and/or their attributes
+    """
+    attrs_list = ('ipaConfigString', 'cn')
+    enabled_attrs_value = {'ipaConfigString': 'enabledService'}
+
+    component_services = set()
+
+    @property
+    def search_base(self):
+        return DN(self.api.env.container_masters, self.api.env.basedn)
+
+    @property
+    def search_filter(self):
+        return self.ldap_conn.make_filter_from_attr(
+            'cn',
+            tuple(self.component_services),
+            rules=self.ldap_conn.MATCH_ANY,
+            exact=True
+        )
+
+    def _validate_component_services(self, services):
+        svc_set = {s.name for s in services}
+        if svc_set != self.component_services:
+            raise ValueError(
+                "{}: Mismatch between component services and search result "
+                "(expected: {}, got: {})".format(
+                    self.__class__.__name__,
+                    ','.join(sorted(self.component_services)),
+                    ', '.join(sorted(s.name for s in services))))
+
+    def _get_service(self, entry):
+        entry_cn = entry['cn'][0]
+
+        enabled = all(
+            value in entry.get(attribute, []) for (attribute, value) in
+            self.enabled_attrs_value.items()
+        )
+
+        return _Service(name=entry_cn, enabled=enabled)
+
+    def _are_services_enabled(self, services):
+        return all(s.enabled for s in services)
+
+    def status(self, hostname):
+        try:
+            search_base = self.api.Object.server.get_dn_if_exists(hostname)
+        except errors.NotFound:
+            self.api.Object.server.handle_not_found(hostname)
+
+        try:
+            entries = self.search(search_base)
+
+            services = [self._get_service(e) for e in entries]
+            self._validate_component_services(services)
+
+            return (ENABLED if self._are_services_enabled(services) else
+                    CONFIGURED)
+        except errors.EmptyResult:
+            return ABSENT
+
+    @property
+    def providers(self):
+        services_by_master = defaultdict(list)
+        entries = self.search(self.search_base)
+        for e in entries:
+            service = self._get_service(e)
+            master_cn = e.dn[1]['cn']
+
+            services_by_master[master_cn].append(service)
+
+        providers = []
+        for master in services_by_master:
+            services = services_by_master[master]
+            self._validate_component_services(services)
+            if all(s.enabled for s in services):
+                providers.append(master)
+
+        return sorted(providers)
+
+
+class MembershipBasedRole(BaseServerRole):
+    """
+    Base class for roles which are expressed as a membership of an object
+    related to IPA master (host or service) in a group.
+
+    :param member_obj: member object type, usually 'host' or 'service'
+    """
+    member_obj = None
+
+    @abc.abstractproperty
+    def group_dn(self):
+        """
+        :returns: DN of the group which contains members associated with the
+        role
+        """
+        return None
+
+    def obj_pkey_from_hostname(self, hostname):
+        """
+        given hostname, return the primary key of the member object
+        :param hostname: master FQDN
+        :returns: object primary key constructed from hostname
+        """
+        return hostname
+
+    @abc.abstractmethod
+    def fqdn_from_entry(self, entry):
+        return None
+
+    @property
+    def search_filter(self):
+        return self.ldap_conn.make_filter_from_attr(
+            "memberof",
+            self.group_dn
+        )
+
+    def status(self, hostname):
+        show_cmd = self.api.Command['{}_show'.format(self.member_obj)]
+        obj_pkey = self.obj_pkey_from_hostname(hostname)
+
+        try:
+            result = show_cmd(obj_pkey, raw=True)['result']
+        except errors.NotFound:
+            self.api.Object[self.member_obj].handle_not_found(obj_pkey)
+
+        return ENABLED if self.group_dn in result['memberof'] else ABSENT
+
+    @property
+    def providers(self):
+        try:
+            entries = self.search(self.search_base)
+            return sorted(self.fqdn_from_entry(entry) for entry in
+                          entries)
+        except errors.EmptyResult:
+            return []
+
+    @property
+    def search_base(self):
+        return DN(self.api.env['container_{}'.format(self.member_obj)],
+                  self.api.env.basedn)
+
+
+class ADTrustBasedRole(MembershipBasedRole):
+    @property
+    def group_dn(self):
+        return DN(
+            ('cn', 'adtrust agents'), ('cn', 'sysaccounts'),
+            ('cn', 'etc'), self.api.env.basedn)
+
+
+@register_role
+class ADTrustAgent(ADTrustBasedRole):
+    name = u"AD trust agent"
+    member_obj = 'host'
+
+    def fqdn_from_entry(self, entry):
+        return entry.single_value.get('fqdn')
+
+
+@register_role
+class ADTrustController(ServiceBasedRole):
+    name = u"AD trust controller"
+    component_services = {'ADTRUST'}
+
+
+@register_role
+class IPAMaster(ServiceBasedRole):
+    name = u"IPA master"
+    component_services = {'KDC', 'KPASSWD', 'HTTP'}
+
+
+@register_role
+class CA(ServiceBasedRole):
+    name = u"CA server"
+    component_services = {'CA'}
+
+
+@register_role
+class DNSServer(ServiceBasedRole):
+    name = u"DNS server"
+    component_services = {'DNS', 'DNSKeySync'}
+
+
+@register_role
+class KRA(ServiceBasedRole):
+    name = u"KRA"
+    component_services = {'KRA'}
+
+
+@register_attribute
+class DNSSecKeyMaster(BaseServerAttribute):
+    name = "DNSSec key master"
+    read_only = True
+    singular = True
+
+    associated_role = DNSServer
+    associated_service_name = "DNSSEC"
+    ipa_config_string_value = "dnssecKeyMaster"
+
+
+@register_attribute
+class CARenewalMaster(BaseServerAttribute):
+    name = "CA renewal master"
+    read_only = False
+    singular = True
+
+    associated_role = CA
+    associated_service_name = "CA"
+    ipa_config_string_value = "caRenewalMaster"
+
+
+register = Registry()
+
+
+@register()
+class serverroles(Backend):
+    """
+    This Backend can be used to query various information about server roles
+    and attributes configured in the topology.
+    """
+
+    def __init__(self, api_instance):
+        super(serverroles, self).__init__(api_instance)
+        self.roles = None
+        self.attributes = None
+
+    def _on_finalize(self):
+        self.roles = role_factory(self.api)
+        self.attributes = attribute_factory(self.api)
+
+    def _handle_obj_not_found(self, obj_type, name):
+        raise errors.NotFound(
+            reason=_(
+                "{obj_type} name {name} not found".format(
+                    obj_type=obj_type, name=name)))
+
+    def _get_obj_store(self, obj_type="Role"):
+        if obj_type.lower() == "role":
+            return self.roles
+        elif obj_type.lower() == "attribute":
+            return self.attributes
+        else:
+            raise ValueError("Must be role or attribute")
+
+    def _get_instances(self, names, obj_type="Role"):
+        obj_store = self._get_obj_store(obj_type=obj_type)
+        for name in names:
+            key = name.lower()
+            if key not in obj_store:
+                self._handle_obj_not_found(obj_type, name)
+
+            yield obj_store[key]
+
+    def _get_providers(self, obj_names, obj_type="Role"):
+        return {obj.name: obj.providers for obj in
+                self._get_instances(obj_names, obj_type=obj_type)}
+
+    def get_role_status(self, hostname, role_names):
+        """
+        get status of a role (enabled/configured/available/absent) on a
+        master.
+        :param hostname: server hostname
+        :param role_names: iterable with role names
+        :returns: dictionary of role statuses keyed by role name
+        :raises: NotFound if master is not found or unknown role is encountered
+        """
+        return {role.name: role.status(hostname) for role in
+                self._get_instances(role_names, obj_type="Role")}
+
+    def find_roles(self, hostname, substr):
+        """
+        Search the master for the role name among available roles and print the
+        status of the matched roles. Returns empty dict if no
+        match is found.
+
+        :param hostname: server hostname
+        :param substr: substring to match
+        :returns: status of the matched roles as a dict
+        """
+        key = substr.lower()
+        found_roles = [name for name in self.roles if key in
+                       name]
+        if not found_roles:
+            return {}
+
+        return self.get_role_status(hostname, found_roles)
+
+    def get_role_providers(self, role_names):
+        """
+        list all masters which provide the given role, i.e. the role is marked
+        as enabled on them.
+        :param role_name: name of a role
+        :returns: dict of FQDNs which provide the roles keyed by role name
+        :raises: NotFound if some role name could not be found among available
+        roles
+        """
+
+        return self._get_providers(role_names, obj_type="Role")
+
+    def get_provided_roles(self, hostname):
+        """
+        get all roles enabled on a master
+        :param hostname: master FQDN
+        :returns: list of canonical names of enabled roles
+        """
+        role_statuses = self.get_role_status(hostname, self.roles.keys())
+        return sorted(name for name, status in role_statuses.items()
+                      if status == ENABLED)
+
+    def get_attribute_providers(self, attr_names):
+        """
+        list all masters with a given attribute such as DNSSec master or CA
+        renewal master
+
+        :param attr_name: name of the attribute
+        :returns: dictionary of master FQDNs keyed by attribute name
+        :raises: NotFound if on eof the attribute names is not found
+        """
+        return self._get_providers(attr_names, obj_type="Attribute")
+
+    def set_attribute_provider(self, hostname, attr_name):
+        """
+        set an attribute (e.g. CA renewal master) on some master. Please note
+        that some attributes are not defined purely by data in LDAP and may
+        require changes in local configuration of the master. In that case the
+        function will raise an ExecutionError.
+        :param hostname: master FQDN
+        :param attr_name: attribute name
+        :returns: dictionary of original and new masters with the attribute
+        """
+        key = attr_name.lower()
+        if key not in self.attributes:
+            self._handle_obj_not_found("Attribute", attr_name)
+
+        return self.attributes[key].set_provider(hostname)
-- 
2.5.5

From 4c86efa202e232d0be81f858077356d2edc94188 Mon Sep 17 00:00:00 2001
From: Martin Babinsky <mbabi...@redhat.com>
Date: Thu, 19 May 2016 13:40:12 +0200
Subject: [PATCH 2/2] Test suite for `serverroles` backend

Tests retrieving roles/attributes and setting server attributes in various
scenarios.

https://fedorahosted.org/freeipa/ticket/5181
---
 ipatests/test_ipaserver/test_serverroles.py | 769 ++++++++++++++++++++++++++++
 1 file changed, 769 insertions(+)
 create mode 100644 ipatests/test_ipaserver/test_serverroles.py

diff --git a/ipatests/test_ipaserver/test_serverroles.py b/ipatests/test_ipaserver/test_serverroles.py
new file mode 100644
index 0000000000000000000000000000000000000000..25d6e3a979dc90cae697c0f835b421c811074aff
--- /dev/null
+++ b/ipatests/test_ipaserver/test_serverroles.py
@@ -0,0 +1,769 @@
+#
+# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
+#
+
+"""
+Tests for the serverroles backend
+"""
+
+from collections import namedtuple
+
+import ldap
+import pytest
+
+from ipalib import api, create_api, errors
+from ipapython.dn import DN
+from ipaserver.plugins.ldap2 import ldap2
+from ipaserver.plugins.serverroles import serverroles
+
+from ipatests.util import MockLDAP
+
+
+def _make_service_entry_mods(enabled=True, other_config=None):
+    mods = {
+        b'objectClass': [b'top', b'nsContainer', b'ipaConfigObject'],
+    }
+    if enabled:
+        mods.update({b'ipaConfigString': [b'enabledService']})
+
+    if other_config is not None:
+        mods.setdefault(b'ipaConfigString', [])
+        mods[b'ipaConfigString'].extend(other_config)
+
+    return mods
+
+
+def _make_master_entry_mods(ca=False):
+    mods = {
+        b'objectClass': [
+            b'top',
+            b'nsContainer',
+            b'ipaReplTopoManagedServer',
+            b'ipaSupportedDomainLevelConfig',
+            b'ipaConfigObject',
+        ],
+        b'ipaMaxDomainLevel': [b'1'],
+        b'ipaMinDomainLevel': [b'0'],
+        b'ipaReplTopoManagedsuffix': [str(api.env.basedn)]
+    }
+    if ca:
+        mods[b'ipaReplTopoManagedsuffix'].append(b'o=ipaca')
+
+    return mods
+
+_adtrust_agents = DN(
+    ('cn', 'adtrust agents'),
+    ('cn', 'sysaccounts'),
+    ('cn', 'etc'),
+    api.env.basedn
+)
+
+
+master_data = {
+    'ca-dns-dnssec-keymaster': {
+        'services': {
+            'CA': {
+                'enabled': True,
+            },
+            'DNS': {
+                'enabled': True,
+            },
+            'DNSKeySync': {
+                'enabled': True,
+            },
+            'DNSSEC': {
+                'enabled': True,
+                'config': ['DNSSecKeyMaster']
+            }
+        },
+        'expected_roles': {
+            'enabled': ['IPA master', 'CA server', 'DNS server']
+        },
+        'expected_attributes': ['DNSSec key master']
+    },
+    'ca-kra-renewal-master': {
+        'services': {
+            'CA': {
+                'enabled': True,
+                'config': ['caRenewalMaster']
+            },
+            'KRA': {
+                'enabled': True,
+            },
+        },
+        'expected_roles': {
+            'enabled': ['IPA master', 'CA server', 'KRA']
+        },
+        'expected_attributes': ['CA renewal master']
+    },
+    'dns-trust-agent': {
+        'services': {
+            'DNS': {
+                'enabled': True,
+            },
+            'DNSKeySync': {
+                'enabled': True,
+            }
+        },
+        'attributes': {
+            _adtrust_agents: {
+                'member': ['host']
+            }
+        },
+        'expected_roles': {
+            'enabled': ['IPA master', 'DNS server', 'AD trust agent']
+        }
+    },
+    'trust-agent': {
+        'attributes': {
+            _adtrust_agents: {
+                'member': ['host']
+            }
+        },
+        'expected_roles': {
+            'enabled': ['IPA master', 'AD trust agent']
+        }
+    },
+    'trust-controller-dns': {
+        'services': {
+            'ADTRUST': {
+                'enabled': True,
+            },
+            'DNS': {
+                'enabled': True,
+            },
+            'DNSKeySync': {
+                'enabled': True,
+            }
+        },
+        'attributes': {
+            _adtrust_agents: {
+                'member': ['host', 'cifs']
+            }
+        },
+        'expected_roles': {
+            'enabled': ['IPA master', 'AD trust agent', 'AD trust controller',
+                        'DNS server']
+        }
+    },
+    'trust-controller-ca': {
+        'services': {
+            'ADTRUST': {
+                'enabled': True,
+            },
+            'CA': {
+                'enabled': True,
+            },
+        },
+        'attributes': {
+            _adtrust_agents: {
+                'member': ['host', 'cifs']
+            }
+        },
+        'expected_roles': {
+            'enabled': ['IPA master', 'AD trust agent', 'AD trust controller',
+                        'CA server']
+        }
+    },
+    'configured-ca': {
+        'services': {
+            'CA': {
+                'enabled': False,
+            },
+        },
+        'expected_roles': {
+            'enabled': ['IPA master'],
+            'configured': ['CA server']
+        }
+    },
+    'configured-dns': {
+        'services': {
+            'DNS': {
+                'enabled': False,
+            },
+            'DNSKeySync': {
+                'enabled': False,
+            }
+        },
+        'expected_roles': {
+            'enabled': ['IPA master'],
+            'configured': ['DNS server']
+        }
+    },
+    'mixed-state-dns': {
+        'services': {
+            'DNS': {
+                'enabled': False
+            },
+            'DNSKeySync': {
+                'enabled': True
+            }
+        },
+        'expected_roles': {
+            'enabled': ['IPA master'],
+            'configured': ['DNS server']
+        }
+    },
+}
+
+
+class MockMasterTopology(object):
+    """
+    object that will set up and tear down entries in LDAP backend to mimic
+    a presence of real IPA masters with services running on them.
+    """
+
+    ipamaster_services = [u'KDC', u'HTTP', u'KPASSWD']
+
+    def __init__(self, api_instance, domain_data):
+        self.api = api_instance
+
+        self.domain = self.api.env.domain
+        self.domain_data = domain_data
+        self.masters_base = DN(
+            self.api.env.container_masters, self.api.env.basedn)
+
+        self.test_master_dn = DN(
+            ('cn', self.api.env.host), self.api.env.container_masters,
+            self.api.env.basedn)
+
+        self.ldap = MockLDAP()
+
+        self.existing_masters = {
+            m['cn'][0] for m in self.api.Command.server_find(
+                u'', sizelimit=0)['result']}
+
+        self.existing_attributes = self._check_test_host_attributes()
+
+    def iter_domain_data(self):
+        MasterData = namedtuple('MasterData',
+                                ['dn', 'fqdn', 'services', 'attrs'])
+        for name in self.domain_data:
+            fqdn = self.get_fqdn(name)
+            master_dn = self.get_master_dn(name)
+            master_services = self.domain_data[name].get('services', {})
+
+            master_attributes = self.domain_data[name].get('attributes', {})
+
+            yield MasterData(
+                dn=master_dn,
+                fqdn=fqdn,
+                services=master_services,
+                attrs=master_attributes
+            )
+
+    def get_fqdn(self, name):
+        return '.'.join([name, self.domain])
+
+    def get_master_dn(self, name):
+        return DN(('cn', self.get_fqdn(name)), self.masters_base)
+
+    def get_service_dn(self, name, master_dn):
+        return DN(('cn', name), master_dn)
+
+    def _add_host_entry(self, fqdn):
+        try:
+            self.api.Command.host_add(fqdn, force=True)
+            self.api.Command.hostgroup_add_member(u'ipaservers', host=fqdn)
+        except errors.DuplicateEntry:
+            pass
+
+    def _del_host_entry(self, fqdn):
+        try:
+            self.api.Command.host_del(fqdn)
+        except errors.NotFound:
+            pass
+
+    def _add_service_entry(self, service, fqdn):
+        return self.api.Command.service_add(
+            '/'.join([service, fqdn]),
+            force=True
+        )
+
+    def _del_service_entry(self, service, fqdn):
+        try:
+            self.api.Command.service_del(
+                '/'.join([service, fqdn]),
+            )
+        except errors.NotFound:
+            pass
+
+    def _add_svc_entries(self, master_dn, svc_desc):
+        self._add_ipamaster_services(master_dn)
+        for name in svc_desc:
+            svc_dn = self.get_service_dn(name, master_dn)
+            svc_mods = svc_desc[name]
+
+            self.ldap.add_entry(
+                str(svc_dn),
+                _make_service_entry_mods(
+                    enabled=svc_mods['enabled'],
+                    other_config=svc_mods.get('config', None)))
+
+    def _remove_svc_master_entries(self, master_dn):
+        try:
+            entries = self.ldap.connection.search_s(
+                str(master_dn), ldap.SCOPE_SUBTREE
+            )
+        except ldap.NO_SUCH_OBJECT:
+            return
+
+        if entries:
+            entries.sort(key=lambda x: len(x[0]), reverse=True)
+            for entry_dn, attrs in entries:
+                self.ldap.del_entry(str(entry_dn))
+
+    def _add_ipamaster_services(self, master_dn):
+        """
+        add all the service entries which are part of the IPA Master role
+        """
+        for svc_name in self.ipamaster_services:
+            svc_dn = self.get_service_dn(svc_name, master_dn)
+            self.ldap.add_entry(str(svc_dn), _make_service_entry_mods())
+
+    def _add_members(self, dn, fqdn, member_attrs):
+        entry, attrs = self.ldap.connection.search_s(
+            str(dn), ldap.SCOPE_SUBTREE)[0]
+        mods = []
+        value = attrs.get('member', [])
+        mod_op = ldap.MOD_REPLACE
+        if not value:
+            mod_op = ldap.MOD_ADD
+
+        for a in member_attrs:
+
+            if a == 'host':
+                value.append(
+                    str(self.api.Object.host.get_dn(fqdn)))
+            else:
+                result = self._add_service_entry(a, fqdn)['result']
+                value.append(str(result['dn']))
+
+        mods.append(
+            (mod_op, 'member', value)
+        )
+
+        self.ldap.connection.modify_s(str(dn), mods)
+
+    # TODO: the explicit member removal may not be necessary since referint
+    # plugin should take care of them
+    def _remove_members(self, dn, fqdn, member_attrs):
+        entry, attrs = self.ldap.connection.search_s(
+            str(dn), ldap.SCOPE_SUBTREE)[0]
+        mods = []
+        for a in member_attrs:
+            value = set(attrs.get('member', []))
+            if not value:
+                continue
+
+            if a == 'host':
+                try:
+                    value.remove(
+                        str(self.api.Object.host.get_dn(fqdn)))
+                except KeyError:
+                    pass
+            else:
+                try:
+                    value.remove(
+                        str(self.api.Object.service.get_dn(
+                            '/'.join([a, fqdn]))))
+                except KeyError:
+                    pass
+                self._del_service_entry(a, fqdn)
+
+        mods.append(
+            (ldap.MOD_REPLACE, 'member', list(value))
+        )
+
+        try:
+            self.ldap.connection.modify_s(str(dn), mods)
+        except (ldap.NO_SUCH_OBJECT, ldap.NO_SUCH_ATTRIBUTE):
+            pass
+
+    def _check_test_host_attributes(self):
+        existing_attributes = set()
+
+        for service, value, attr_name in (
+                ('CA', 'caRenewalMaster', 'ca renewal master'),
+                ('DNSSEC', 'DNSSecKeyMaster', 'dnssec key master')):
+
+            svc_dn = DN(('cn', service), self.test_master_dn)
+            try:
+                svc_entry = self.api.Backend.ldap2.get_entry(svc_dn)
+            except errors.NotFound:
+                continue
+            else:
+                config_string_val = svc_entry.get('ipaConfigString', [])
+
+                if value in config_string_val:
+                    existing_attributes.add(attr_name)
+
+        return existing_attributes
+
+    def _remove_ca_renewal_master(self):
+        if 'ca renewal master' not in self.existing_attributes:
+            return
+
+        ca_dn = DN(('cn', 'CA'), self.test_master_dn)
+        ca_entry = self.api.Backend.ldap2.get_entry(ca_dn)
+
+        config_string_val = ca_entry.get('ipaConfigString', [])
+        try:
+            config_string_val.remove('caRenewalMaster')
+        except KeyError:
+            return
+
+        ca_entry.update({'ipaConfigString': config_string_val})
+        self.api.Backend.ldap2.update_entry(ca_entry)
+
+    def _restore_ca_renewal_master(self):
+        if 'ca renewal master' not in self.existing_attributes:
+            return
+
+        ca_dn = DN(('cn', 'CA'), self.test_master_dn)
+        ca_entry = self.api.Backend.ldap2.get_entry(ca_dn)
+
+        config_string_val = ca_entry.get('ipaConfigString', [])
+        config_string_val.append('caRenewalMaster')
+
+        ca_entry.update({'ipaConfigString': config_string_val})
+        self.api.Backend.ldap2.update_entry(ca_entry)
+
+    def setup_data(self):
+        self._remove_ca_renewal_master()
+        for master_data in self.iter_domain_data():
+            # create host
+            self._add_host_entry(master_data.fqdn)
+
+            # create master entry first
+            self.ldap.add_entry(
+                str(master_data.dn), _make_master_entry_mods(
+                    ca='CA' in master_data.services))
+
+            # now add service entries
+            self._add_svc_entries(master_data.dn, master_data.services)
+
+            # optionally add some attributes required e.g. by AD trust roles
+            for entry_dn, attrs in master_data.attrs.items():
+                if 'member' in attrs:
+                    self._add_members(
+                        entry_dn,
+                        master_data.fqdn,
+                        attrs['member']
+                    )
+
+    def teardown_data(self):
+        self._restore_ca_renewal_master()
+        for master_data in self.iter_domain_data():
+            # first remove the master entries and service containers
+            self._remove_svc_master_entries(master_data.dn)
+
+            # optionally clean up leftover attributes
+            for entry_dn, attrs in master_data.attrs.items():
+                if 'member' in attrs:
+                    self._remove_members(
+                        entry_dn,
+                        master_data.fqdn,
+                        attrs['member'],
+                    )
+
+            # finally remove host entry
+            self._del_host_entry(master_data.fqdn)
+
+
+@pytest.fixture(scope='module')
+def mock_api(request):
+    test_api = create_api(mode=None)
+    test_api.bootstrap(in_server=True, in_tree=True)
+    test_api.add_plugin(ldap2)
+    test_api.add_plugin(serverroles)
+    test_api.finalize()
+
+    if not test_api.Backend.ldap2.isconnected():
+        test_api.Backend.ldap2.connect()
+
+    return test_api
+
+
+@pytest.fixture(scope='module')
+def mock_masters(request, mock_api):
+    """
+    Populate the LDAP backend with test data
+    """
+
+    if not api.Backend.rpcclient.isconnected():
+        api.Backend.rpcclient.connect()
+
+    master_topo = MockMasterTopology(mock_api, master_data)
+
+    def finalize():
+        master_topo.teardown_data()
+        if api.Backend.rpcclient.isconnected():
+            api.Backend.rpcclient.disconnect()
+
+    request.addfinalizer(finalize)
+
+    master_topo.setup_data()
+
+    return master_topo
+
+
+def enabled_role_iter(master_data):
+    for m in master_data:
+        for role in master_data[m]['expected_roles']['enabled']:
+            yield m, role
+
+
+def provided_role_iter(master_data):
+    for m in master_data:
+        yield m,  master_data[m]['expected_roles']['enabled']
+
+
+def configured_role_iter(master_data):
+    for m in master_data:
+        if 'configured' in master_data[m]['expected_roles']:
+            for role in master_data[m]['expected_roles']['configured']:
+                yield m, role
+
+
+def role_provider_iter(master_data):
+    result = {}
+    for m in master_data:
+        for role in master_data[m]['expected_roles']['enabled']:
+            if role not in result:
+                result[role] = []
+
+            result[role].append(m)
+
+    for role_name, masters in result.items():
+        yield role_name, masters
+
+
+def attribute_masters_iter(master_data):
+    for m in master_data:
+        if 'expected_attributes' in master_data[m]:
+            for attr in master_data[m]['expected_attributes']:
+                yield m, attr
+
+
+def dns_servers_iter(master_data):
+    for m in master_data:
+        if "DNS server" in master_data[m]['expected_roles']['enabled']:
+            yield m
+
+
+@pytest.fixture(params=list(enabled_role_iter(master_data)),
+                ids=['role: {}, master: {}, enabled'.format(role, m)
+                     for m, role in enabled_role_iter(master_data)])
+def enabled_role(request):
+    return request.param
+
+
+@pytest.fixture(params=list(provided_role_iter(master_data)),
+                ids=["{}: {}".format(m, ', '.join(roles)) for m, roles in
+                     provided_role_iter(master_data)])
+def provided_roles(request):
+    return request.param
+
+
+@pytest.fixture(params=list(configured_role_iter(master_data)),
+                ids=['role: {}, master: {}, configured'.format(role, m)
+                     for m, role in configured_role_iter(master_data)])
+def configured_role(request):
+    return request.param
+
+
+@pytest.fixture(params=list(role_provider_iter(master_data)),
+                ids=['{} providers'.format(role_name)
+                     for role_name, m in
+                     role_provider_iter(master_data)])
+def role_providers(request):
+    return request.param
+
+
+@pytest.fixture(params=list(attribute_masters_iter(master_data)),
+                ids=['{}: {}'.format(attr, m) for m, attr in
+                     attribute_masters_iter(master_data)])
+def attribute_providers(request):
+    return request.param
+
+
+@pytest.fixture(params=list(dns_servers_iter(master_data)),
+                ids=list(dns_servers_iter(master_data)))
+def dns_server(request):
+    return request.param
+
+
+class TestServerRoleStatusRetrieval(object):
+    def get_role_status(self, master, role, mock_api, mock_masters):
+        fqdn = mock_masters.get_fqdn(master)
+        return mock_api.Backend.serverroles.get_role_status(fqdn, role)
+
+    def get_role_providers(self, role_name, mock_api, mock_masters):
+        result = mock_api.Backend.serverroles.get_role_providers(
+            [role_name])[role_name]
+
+        return [r for r in result if r not in mock_masters.existing_masters]
+
+    def get_provided_roles(self, master, mock_api, mock_masters):
+        fqdn = mock_masters.get_fqdn(master)
+        return mock_api.Backend.serverroles.get_provided_roles(fqdn)
+
+    def test_listing_of_enabled_role(
+            self, mock_api, mock_masters, enabled_role):
+        master, role_name = enabled_role
+        result = self.get_role_status(master, [role_name], mock_api,
+                                      mock_masters)
+        assert result[role_name] == u'enabled'
+
+    def test_listing_of_configured_role(
+            self, mock_api, mock_masters, configured_role):
+        master, role_name = configured_role
+        result = self.get_role_status(master, [role_name], mock_api,
+                                      mock_masters)
+        assert result[role_name] == u'configured'
+
+    def test_role_providers(
+            self, mock_api, mock_masters, role_providers):
+        role_name, providers = role_providers
+        expected_masters = sorted(mock_masters.get_fqdn(m) for m in providers)
+
+        actual_masters = self.get_role_providers(
+            role_name, mock_api, mock_masters)
+
+        assert expected_masters == actual_masters
+
+    def test_provided_roles(self, mock_api, mock_masters, provided_roles):
+        master, expected_roles = provided_roles
+        expected_roles.sort()
+        actual_roles = self.get_provided_roles(master, mock_api, mock_masters)
+        assert expected_roles == actual_roles
+
+    def test_unknown_role_status_raises_notfound(self, mock_api, mock_masters):
+        unknown_roles = ['IAP maestr']
+        fqdn = mock_masters.get_fqdn('ca-dns-dnssec-keymaster')
+        with pytest.raises(errors.NotFound):
+            mock_api.Backend.serverroles.get_role_status(fqdn, unknown_roles)
+
+    def test_unknown_role_among_known_ones_raises_notfound(self, mock_api,
+                                                           mock_masters):
+        mixed_roles = ['IAP maestr', 'CA server']
+        fqdn = mock_masters.get_fqdn('ca-dns-dnssec-keymaster')
+        with pytest.raises(errors.NotFound):
+            mock_api.Backend.serverroles.get_role_status(fqdn, mixed_roles)
+
+    def test_role_status_on_invalid_fqdn_raises_notfound(self, mock_api):
+        with pytest.raises(errors.NotFound):
+            mock_api.Backend.serverroles.get_role_status(
+                'dwqfewe.fwaergr.aerg', 'IPA master')
+
+    def test_substring_search_for_dns_server(self, mock_api, mock_masters,
+                                             dns_server):
+        substring = 'dns'
+        dns_fqdn = mock_masters.get_fqdn(dns_server)
+
+        result = mock_api.Backend.serverroles.find_roles(dns_fqdn, substring)
+
+        assert result['DNS server'] == u'enabled'
+
+    def test_empty_substring_queries_all_roles_on_server(self, mock_api,
+                                                         mock_masters):
+        master_name = 'ca-dns-dnssec-keymaster'
+        fqdn = mock_masters.get_fqdn(master_name)
+        result = mock_api.Backend.serverroles.find_roles(fqdn, '')
+
+        for r in result:
+            if r in master_data[master_name]['expected_roles']['enabled']:
+                assert result[r] == u'enabled'
+            else:
+                assert result[r] == u'absent'
+
+    def test_invalid_substring_search_returns_nothing(self, mock_api,
+                                                      mock_masters):
+        invalid_substr = 'fwfgbb'
+        fqdn = mock_masters.get_fqdn('ca-dns-dnssec-keymaster')
+
+        assert (
+            mock_api.Backend.serverroles.find_roles(fqdn, invalid_substr) == {}
+        )
+
+    def test_susbtring_search_on_invalid_fqdn_raises_notfound(self, mock_api):
+        with pytest.raises(errors.NotFound):
+            mock_api.Backend.serverroles.find_roles('wefwe.fWGRHJT.fwev',
+                                                    'IPA master')
+
+
+class TestServerAttributes(object):
+    def get_attr_providers(self, attr_name, mock_api, mock_masters):
+        result = mock_api.Backend.serverroles.get_attribute_providers(
+            [attr_name])
+        return result
+
+    def test_attribute_providers(self, mock_api, mock_masters,
+                                 attribute_providers):
+        master, attr_name = attribute_providers
+        fqdn = mock_masters.get_fqdn(master)
+        actual_attr_masters = self.get_attr_providers(
+            attr_name, mock_api, mock_masters)
+
+        assert actual_attr_masters[attr_name] == [fqdn]
+
+    def test_set_readonly_attribute_raises_executionerror(self, mock_api,
+                                                          mock_masters):
+        fqdn = mock_masters.get_fqdn('dns-trust-master')
+        attr_name = 'DNSSec key master'
+        with pytest.raises(errors.ReadOnlyServerAttribute):
+            mock_api.Backend.serverroles.set_attribute_provider(
+                fqdn, attr_name)
+
+    def test_set_attribute_on_provider_raises_error(
+            self, mock_api, mock_masters):
+        attr_name = "CA renewal master"
+        backend = mock_api.Backend.serverroles
+        existing_renewal_master = backend.get_attribute_providers(
+            [attr_name])[attr_name][0]
+
+        with pytest.raises(errors.MasterAlreadyProvidesRole):
+            backend.set_attribute_provider(existing_renewal_master, attr_name)
+
+    def test_set_attribute_on_master_without_assoc_role_raises_error(
+            self, mock_api, mock_masters):
+        attr_name = "CA renewal master"
+        backend = mock_api.Backend.serverroles
+        non_ca_fqdn = mock_masters.get_fqdn('trust-controller-dns')
+
+        with pytest.raises(errors.MasterDoesNotProvideRole):
+            backend.set_attribute_provider(non_ca_fqdn, attr_name)
+
+    def test_set_unknown_attribute_on_master_raises_notfound(self, mock_api,
+                                                             mock_masters):
+        attr_name = "CA renuwal maztah"
+        backend = mock_api.Backend.serverroles
+        fqdn = mock_masters.get_fqdn('trust-controller-ca')
+
+        with pytest.raises(errors.NotFound):
+            backend.set_attribute_provider(fqdn, attr_name)
+
+    def test_set_attribute_on_invalid_fqdn_raises_notfound(self, mock_api,
+                                                           mock_masters):
+        attr_name = "CA renewal master"
+        backend = mock_api.Backend.serverroles
+        fqdn = 'weqfgrh.oadj5.gebbvyu'
+
+        with pytest.raises(errors.NotFound):
+            backend.set_attribute_provider(fqdn, attr_name)
+
+    def test_set_ca_renewal_master_on_other_ca_and_back(self, mock_api,
+                                                        mock_masters):
+        attr_name = "CA renewal master"
+        backend = mock_api.Backend.serverroles
+        original_renewal_master = backend.get_attribute_providers(
+            [attr_name])[attr_name][0]
+
+        other_ca_server = mock_masters.get_fqdn('trust-controller-ca')
+
+        for host in (other_ca_server, original_renewal_master):
+            backend.set_attribute_provider(host, attr_name)
+
+            assert (backend.get_attribute_providers(
+                [attr_name])[attr_name] == [host])
-- 
2.5.5

-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to