How this works:
  1. When a trusted domain user is tested, AD GC is searched
     for the user entry Distinguished Name
  2. The user entry is then read from AD GC and its SID and SIDs
     of all its assigned groups (tokenGroups attribute) are retrieved
  3. The SIDs are then used to search IPA LDAP database to find
     all external groups which have any of these SIDs as external
     members
  4. All these groups having these groups as direct or indirect
     members are added to hbactest allowing it to perform the search

LIMITATIONS:
- user SID in hbactest --user parameter is not supported
- only Trusted Admins group members can use this function as it
  uses secret for IPA-Trusted domain link
- List of group SIDs does not contain group memberships outside
  of the trusted domain

https://fedorahosted.org/freeipa/ticket/2997

------------------------

There are also 2 patches changing current dcerpc.py code to make it usable both for group-add-member trusted domain user resolution and also for purposes of the trusted domain user hbactest.

Example of the new hbactest ability:
# ipa hbacrule-show can_login
  Rule name: can_login
  Host category: all
  Source host category: all
  Enabled: TRUE
  User Groups: admins, ad_test_admins
  Services: login, sshd
# ipa group-show ad_test_admins
  Group name: ad_test_admins
  Description: AD.TEST admins
  GID: 179000011
  Member groups: ext_all_admins
  Member of HBAC rule: can_login
# ipa group-show ext_all_admins
  Group name: ext_all_admins
  Description: All AD.TEST admins
  Member of groups: ad_test_admins
  Indirect Member of HBAC rule: can_login
  External member: S-1-5-21-3035198329-144811719-1378114514-512

# ipa hbactest --user='AD\Administrator' --host=`hostname` --service=sshd
--------------------
Access granted: True
--------------------
  Matched rules: can_login

There may still be dragons, I am sending what I have now, still need to run more tests.

Martin
From 64e719de744fad61f60e4ddeb4927f6a2b568b81 Mon Sep 17 00:00:00 2001
From: Martin Kosek <mko...@redhat.com>
Date: Fri, 18 Jan 2013 17:28:39 +0100
Subject: [PATCH 1/3] Generalize AD GC search

Modify access methods to AD GC so that callers can specify a custom
basedn, filter, scope and attribute list, thus allowing it to perform
any LDAP search.

Error checking methodology in these functions was changed, so that it
rather raises an exception with a desription instead of simply returning
a None or False value which would made an investigation why something
does not work much more difficult. External membership method in
group-add-member command was updated to match this approach.

https://fedorahosted.org/freeipa/ticket/2997
---
 ipalib/plugins/group.py |   9 ++--
 ipaserver/dcerpc.py     | 121 +++++++++++++++++++++++++++++++-----------------
 2 files changed, 83 insertions(+), 47 deletions(-)

diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py
index f86b134e61fc8c7518a64d25329babee3398c6ef..347a7ee9fda9cb574f433dff3a9621d8bffee887 100644
--- a/ipalib/plugins/group.py
+++ b/ipalib/plugins/group.py
@@ -384,11 +384,12 @@ class group_add_member(LDAPAddMember):
                 if domain_validator.is_trusted_sid_valid(sid):
                     sids.append(sid)
                 else:
-                    actual_sid = domain_validator.get_sid_trusted_domain_object(sid)
-                    if isinstance(actual_sid, unicode):
-                        sids.append(actual_sid)
+                    try:
+                        actual_sid = domain_validator.get_trusted_domain_object_sid(sid)
+                    except errors.PublicError, e:
+                        failed_sids.append((sid, unicode(e)))
                     else:
-                        failed_sids.append((sid, 'Not a trusted domain SID'))
+                        sids.append(actual_sid)
             if len(sids) == 0:
                 raise errors.ValidationError(name=_('external member'),
                                              error=_('values are not recognized as valid SIDs from trusted domain'))
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index 54a70defc9df52db58054d29c1c9f9189a88cabb..ef937db048a69323fae59687b5c406424add1a03 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -185,41 +185,77 @@ class DomainValidator(object):
                 return True
         return False
 
-    def get_sid_trusted_domain_object(self, object_name):
+
+    def get_trusted_domain_objects(self, domain=None, flatname=None, filter="",
+            attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None):
+        """
+        Search for LDAP objects in a trusted domain specified either by `domain'
+        or `flatname'. The actual LDAP search is specified by `filter', `attrs',
+        `scope' and `basedn'. When `basedn' is empty, database root DN is used.
+        """
+        assert domain is not None or flatname is not None
         """Returns SID for the trusted domain object (user or group only)"""
         if not self.domain:
             # our domain is not configured or self.is_configured() never run
-            return None
+            raise errors.ValidationError(name=_('Trust setup'),
+                error=_('Our domain is not configured'))
         if not self._domains:
             self._domains = self.get_trusted_domains()
         if len(self._domains) == 0:
             # Our domain is configured but no trusted domains are configured
-            return None
+            raise errors.ValidationError(name=_('Trust setup'),
+                error=_('No trusted domain is not configured'))
 
-        components = normalize_name(object_name)
-        if not ('domain' in components or 'flatname' in components):
-            # No domain or realm specified, ambiguous search
-            return False
-
-        entry = None
-        if 'domain' in components and components['domain'] in self._domains:
+        entries = None
+        if domain is not None:
+            if domain not in self._domains:
+                raise errors.ValidationError(name=_('trusted domain object'),
+                   error= _('domain is not trusted'))
             # Now we have a name to check against our list of trusted domains
-            entry = self.resolve_against_gc(components['domain'], components['name'])
-        elif 'flatname' in components:
+            entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+        elif flatname is not None:
             # Flatname was specified, traverse through the list of trusted
             # domains first to find the proper one
+            found_flatname = False
             for domain in self._domains:
-                if self._domains[domain][0] == components['flatname']:
-                    entry = self.resolve_against_gc(domain, components['name'])
-                    if entry:
+                if self._domains[domain][0] == flatname:
+                    found_flatname = True
+                    entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
+                    if entries:
                         break
-        if entry:
-            try:
-                test_sid = security.dom_sid(entry)
-                return unicode(test_sid)
-            except TypeError, e:
-                return False
-        return False
+            if not found_flatname:
+                raise errors.ValidationError(name=_('trusted domain object'),
+                        error= _('no trusted domain matched the specified flat name'))
+        if not entries:
+            raise errors.NotFound(reason=_('trusted domain object not found'))
+
+        return entries
+
+    def get_trusted_domain_object_sid(self, object_name):
+        components = normalize_name(object_name)
+        if not ('domain' in components or 'flatname' in components):
+            # No domain or realm specified, ambiguous search
+             raise errors.ValidationError(name=_('trusted domain object'),
+                   error= _('Ambiguous search, user domain was not specified'))
+
+        attrs = ['objectSid']
+        filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \
+                % dict(name=components['name'])
+        scope = _ldap.SCOPE_SUBTREE
+        entries = self.get_trusted_domain_objects(components.get('domain'),
+                components.get('flatname'), filter, attrs, scope)
+
+        if len(entries) > 1:
+            # Treat non-unique entries as invalid
+            raise errors.ValidationError(name=_('trusted domain object'),
+               error= _('Trusted domain did not return a unique object'))
+        sid = self.__sid_to_str(entries[0][1]['objectSid'][0])
+        try:
+            test_sid = security.dom_sid(sid)
+            return unicode(test_sid)
+        except TypeError, e:
+            raise errors.ValidationError(name=_('trusted domain object'),
+               error= _('Trusted domain did not return a valid SID for the object'))
 
     def __sid_to_str(self, sid):
         """
@@ -272,36 +308,33 @@ class DomainValidator(object):
                         dict(domain=info['dns_domain'],message=stderr.strip()))
             return (None, None)
 
-    def resolve_against_gc(self, domain, name):
+    def search_in_gc(self, domain, filter, attrs, scope, basedn=None):
         """
-        Resolves `name' against trusted domain `domain' using Global Catalog
-        Returns SID of the `name' or None
+        Perform LDAP search in a trusted domain `domain' Global Catalog.
+        Returns resulting entries or None
         """
-        entry = None
+        entries = None
         sid = None
         info = self.__retrieve_trusted_domain_gc_list(domain)
         if not info:
-            return None
+             raise errors.ValidationError(name=_('Trust setup'),
+                error=_('Cannot retrieve trusted domain GC list'))
         for (host, port) in info['gc']:
-            entry = self.__resolve_against_gc(info, host, port, name)
-            if entry:
+            entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn)
+            if entries:
                 break
 
-        if entry:
-            l = len(entry)
-            if l > 2:
-                # Treat non-unique entries as invalid
-                return None
-            sid = self.__sid_to_str(entry[0][1]['objectSid'][0])
-        return sid
+        return entries
 
-    def __resolve_against_gc(self, info, host, port, name):
+    def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None):
         """
-        Actual resolution against LDAP server, using SASL GSSAPI authentication
+        Actual search in AD LDAP server, using SASL GSSAPI authentication
         Returns LDAP result or None
         """
         conn = IPAdmin(host=host, port=port)
         auth = self.__extract_trusted_auth(info)
+        if attrs is None:
+            attrs = []
         if auth:
             (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
             if ccache_name:
@@ -314,13 +347,15 @@ class DomainValidator(object):
                 # records pointing back to the same host name
                 conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
                 conn.sasl_interactive_bind_s(None, sasl_auth)
-                base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
+                if basedn is None:
+                    # Use domain root base DN
+                    basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
                 # We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail
-                filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name))
-                attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description'])
-                entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0)
+                filterstr = conn.encode(filter)
+                attrlist = conn.encode(attrs)
+                entries = conn.conn.search_s(str(basedn), scope, filterstr, attrlist, 0)
                 os.environ["KRB5CCNAME"] = old_ccache
-                return entry
+                return entries
 
     def __retrieve_trusted_domain_gc_list(self, domain):
         """
-- 
1.7.11.7

From 21edf01084bfdcf8df7136591bed68a320e80749 Mon Sep 17 00:00:00 2001
From: Martin Kosek <mko...@redhat.com>
Date: Fri, 18 Jan 2013 17:33:19 +0100
Subject: [PATCH 2/3] Do not hide SID resolver error in group-add-member

When group-add-member does not receive any resolved trusted domain
object SID, it raises an exception which hides any useful error
message passed by underlying resolution methods. Remove the exception
to reveal this error messages to user.

https://fedorahosted.org/freeipa/ticket/2997
---
 ipalib/plugins/group.py | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py
index 347a7ee9fda9cb574f433dff3a9621d8bffee887..19404c6fad4b5ee21c1e00d2e799d552601fa29e 100644
--- a/ipalib/plugins/group.py
+++ b/ipalib/plugins/group.py
@@ -390,9 +390,6 @@ class group_add_member(LDAPAddMember):
                         failed_sids.append((sid, unicode(e)))
                     else:
                         sids.append(actual_sid)
-            if len(sids) == 0:
-                raise errors.ValidationError(name=_('external member'),
-                                             error=_('values are not recognized as valid SIDs from trusted domain'))
             restore = []
             if 'member' in failed and 'group' in failed['member']:
                 restore = failed['member']['group']
-- 
1.7.11.7

From dd89a6e4471c1cc16f60aa482faa5610b66f1a95 Mon Sep 17 00:00:00 2001
From: Martin Kosek <mko...@redhat.com>
Date: Fri, 18 Jan 2013 17:38:15 +0100
Subject: [PATCH 3/3] Add support for AD users to hbactest command

How this works:
  1. When a trusted domain user is tested, AD GC is searched
     for the user entry Distinguished Name
  2. The user entry is then read from AD GC and its SID and SIDs
     of all its assigned groups (tokenGroups attribute) are retrieved
  3. The SIDs are then used to search IPA LDAP database to find
     all external groups which have any of these SIDs as external
     members
  4. All these groups having these groups as direct or indirect
     members are added to hbactest allowing it to perform the search

LIMITATIONS:
- user SID in hbactest --user parameter is not supported
- only Trusted Admins group members can use this function as it
  uses secret for IPA-Trusted domain link
- List of group SIDs does not contain group memberships outside
  of the trusted domain

https://fedorahosted.org/freeipa/ticket/2997
---
 ipalib/plugins/hbactest.py | 69 +++++++++++++++++++++++++++++++++++++++-------
 ipaserver/dcerpc.py        | 34 +++++++++++++++++++++++
 2 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/ipalib/plugins/hbactest.py b/ipalib/plugins/hbactest.py
index 78fac0241de2ed6b7416029c692ee4bc53327b34..f8e4b0fc6723c3ec4729c577e7fd53b088bbfc32 100644
--- a/ipalib/plugins/hbactest.py
+++ b/ipalib/plugins/hbactest.py
@@ -17,11 +17,19 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from ipalib import api, errors, output
+from ipalib import api, errors, output, util
 from ipalib import Command, Str, Flag, Int
 from types import NoneType
 from ipalib.cli import to_cli
 from ipalib import _, ngettext
+from ipapython.dn import DN
+if api.env.in_server and api.env.context in ['lite', 'server']:
+    try:
+        import ipaserver.dcerpc
+        _dcerpc_bindings_installed = True
+    except ImportError:
+        _dcerpc_bindings_installed = False
+
 import pyhbac
 
 __doc__ = _("""
@@ -298,15 +306,56 @@ class hbactest(Command):
         request = pyhbac.HbacRequest()
 
         if options['user'] != u'all':
-            try:
-                request.user.name = options['user']
-                search_result = self.api.Command.user_show(request.user.name)['result']
-                groups = search_result['memberof_group']
-                if 'memberofindirect_group' in search_result:
-                    groups += search_result['memberofindirect_group']
-                request.user.groups = sorted(set(groups))
-            except:
-                pass
+            # check first if this is not a trusted domain user
+            components = util.normalize_name(options['user'])
+            if 'domain' in components or 'flatname' in components:
+                # this is a trusted domain user
+                if not _dcerpc_bindings_installed:
+                    raise errors.NotFound(reason=_(
+                        'Cannot perform external member validation without '
+                        'Samba 4 support installed. Make sure you have installed '
+                        'server-trust-ad sub-package of IPA on the server'))
+                domain_validator = ipaserver.dcerpc.DomainValidator(self.api)
+                if not domain_validator.is_configured():
+                    raise errors.NotFound(reason=_(
+                        'Cannot search in trusted domains without own domain configured. '
+                        'Make sure you have run ipa-adtrust-install on the IPA server first'))
+                user_sid, group_sids = domain_validator.get_trusted_domain_user_and_groups(options['user'])
+                request.user.name = user_sid
+
+                # Now search for all external groups that have this user or
+                # any of its groups in its external members. Found entires
+                # memberOf links will be then used to gather all groups where
+                # this group is assigned, including the nested ones
+                filter_sids = "(&(objectclass=ipaexternalgroup)(|(ipaExternalMember=%s)))" \
+                        % ")(ipaExternalMember=".join(group_sids + [user_sid])
+
+                ldap = self.api.Backend.ldap2
+                group_container = DN(api.env.container_group, api.env.basedn)
+                try:
+                    entries, truncated = ldap.find_entries(filter_sids, ['cn', 'memberOf'], group_container)
+                except errors.NotFound:
+                    request.user.groups = []
+                else:
+                    groups = []
+                    for dn, entry in entries:
+                        memberof_dns = entry.get('memberof', [])
+                        for memberof_dn in memberof_dns:
+                            if memberof_dn.endswith(group_container):
+                                # this is a group object
+                                groups.append(memberof_dn[0][0].value)
+                    request.user.groups = sorted(set(groups))
+            else:
+                # try searching for a local user
+                try:
+                    request.user.name = options['user']
+                    search_result = self.api.Command.user_show(request.user.name)['result']
+                    groups = search_result['memberof_group']
+                    if 'memberofindirect_group' in search_result:
+                        groups += search_result['memberofindirect_group']
+                    request.user.groups = sorted(set(groups))
+                except:
+                    pass
 
         if options['service'] != u'all':
             try:
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index ef937db048a69323fae59687b5c406424add1a03..d9325e075c3f75f86977bd1656942340fc5d8012 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -257,6 +257,40 @@ class DomainValidator(object):
             raise errors.ValidationError(name=_('trusted domain object'),
                error= _('Trusted domain did not return a valid SID for the object'))
 
+    def get_trusted_domain_user_and_groups(self, object_name):
+        """
+        Returns a tuple with user SID and a list of SIDs of all groups he is
+        a member of.
+
+        LIMITATIONS:
+            - only Trusted Admins group members can use this function as it
+              uses secret for IPA-Trusted domain link
+            - List of group SIDs does not contain group memberships outside
+              of the trusted domain
+        """
+        components = normalize_name(object_name)
+        if not ('domain' in components or 'flatname' in components):
+            # No domain or realm specified, ambiguous search
+            raise errors.ValidationError(name=_('trusted domain object'),
+                   error= _('Ambiguous search, user domain was not specified'))
+
+        attrs = ['cn']
+        filter = '(&(sAMAccountName=%(name)s)(objectClass=user))' \
+                % dict(name=components['name'])
+        entries = self.get_trusted_domain_objects(components.get('domain'),
+                components.get('flatname'), filter, attrs, _ldap.SCOPE_SUBTREE)
+
+        # Get SIDs of user object and it's groups
+        # tokenGroups attribute must be read with scope BASE to avoid search error
+        attrs = ['objectSID', 'tokenGroups']
+        filter = "(objectClass=user)"
+        base_dn = entries[0][0]
+        entries = self.get_trusted_domain_objects(components.get('domain'),
+            components.get('flatname'), filter, attrs, _ldap.SCOPE_BASE, base_dn)
+        object_sid = self.__sid_to_str(entries[0][1]['objectSid'][0])
+        group_sids = [self.__sid_to_str(sid) for sid in entries[0][1]['tokenGroups']]
+        return (object_sid, group_sids)
+
     def __sid_to_str(self, sid):
         """
         Converts binary SID to string representation
-- 
1.7.11.7

_______________________________________________
Freeipa-devel mailing list
Freeipa-devel@redhat.com
https://www.redhat.com/mailman/listinfo/freeipa-devel

Reply via email to