URL: https://github.com/freeipa/freeipa/pull/227
Author: frasertweedale
 Title: #227: cert-request: match names against principal alises
Action: synchronized

To pull the PR as Git branch:
git remote add ghfreeipa https://github.com/freeipa/freeipa
git fetch ghfreeipa pull/227/head:pr227
git checkout pr227
From 8fb85f2fa0982f8557893fe3159d6780082b2d5f Mon Sep 17 00:00:00 2001
From: Fraser Tweedale <ftwee...@redhat.com>
Date: Wed, 26 Oct 2016 09:48:19 +1000
Subject: [PATCH] cert-request: match names against principal alises

Currently we do not check Kerberos principal aliases when validating
a CSR.  Enhance cert-request to accept the following scenarios:

- for hosts and services: CN and SAN dnsNames match a principal
  alias (realm and service name must be same as nominated principal)

- for all principal types: UPN or KRB5PrincipalName othername match
  any principal alias.

Fixes: https://fedorahosted.org/freeipa/ticket/6295
---
 ipaserver/plugins/cert.py                          | 125 ++++++++++++++++-----
 .../test_xmlrpc/test_caacl_profile_enforcement.py  |  86 ++++++++++++--
 2 files changed, 171 insertions(+), 40 deletions(-)

diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index 4362d82..d3be89a 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -649,11 +649,13 @@ def execute(self, csr, all=False, raw=False, **kw):
         cn = cns[-1].value  # "most specific" is end of list
 
         if principal_type in (SERVICE, HOST):
-            if cn.lower() != principal.hostname.lower():
-                raise errors.ACIError(
-                    info=_("hostname in subject of request '%(cn)s' "
-                        "does not match principal hostname '%(hostname)s'")
-                        % dict(cn=cn, hostname=principal.hostname))
+            if not _dns_name_matches_principal(cn, principal, principal_obj):
+                raise errors.ValidationError(
+                    name='csr',
+                    error=_(
+                        "hostname in subject of request '%(cn)s' does not "
+                        "match name or aliases of principal '%(principal)s'"
+                        ) % dict(cn=cn, principal=principal))
         elif principal_type == USER:
             # check user name
             if cn != principal.username:
@@ -686,26 +688,32 @@ def execute(self, csr, all=False, raw=False, **kw):
             generalnames = x509.process_othernames(ext_san.value)
         for gn in generalnames:
             if isinstance(gn, cryptography.x509.general_name.DNSName):
+                if principal.is_user:
+                    raise errors.ValidationError(
+                        name='csr',
+                        error=_(
+                            "subject alt name type %s is forbidden "
+                            "for user principals") % "DNSName"
+                    )
+
                 name = gn.value
-                alt_principal = None
+
+                if _dns_name_matches_principal(name, principal, principal_obj):
+                    continue  # nothing more to check for this alt name
+
+                # no match yet; check for an alternative principal with
+                # same realm and service type as subject principal.
+                components = list(principal.components)
+                components[-1] = name
+                alt_principal = kerberos.Principal(components, principal.realm)
                 alt_principal_obj = None
                 try:
                     if principal_type == HOST:
-                        alt_principal = kerberos.Principal(
-                            (u'host', name), principal.realm)
-                        alt_principal_obj = api.Command['host_show'](name, all=True)
+                        alt_principal_obj = api.Command['host_show'](
+                            name, all=True)
                     elif principal_type == SERVICE:
-                        alt_principal = kerberos.Principal(
-                            (principal.service_name, name), principal.realm)
                         alt_principal_obj = api.Command['service_show'](
                             alt_principal, all=True)
-                    elif principal_type == USER:
-                        raise errors.ValidationError(
-                            name='csr',
-                            error=_(
-                                "subject alt name type %s is forbidden "
-                                "for user principals") % "DNSName"
-                        )
                 except errors.NotFound:
                     # We don't want to issue any certificates referencing
                     # machines we don't know about. Nothing is stored in this
@@ -713,18 +721,23 @@ def execute(self, csr, all=False, raw=False, **kw):
                     raise errors.NotFound(reason=_('The service principal for '
                         'subject alt name %s in certificate request does not '
                         'exist') % name)
-                if alt_principal_obj is not None:
-                    altdn = alt_principal_obj['result']['dn']
-                    if not ldap.can_write(altdn, "usercertificate"):
-                        raise errors.ACIError(info=_(
-                            "Insufficient privilege to create a certificate "
-                            "with subject alt name '%s'.") % name)
-                if alt_principal is not None and not bypass_caacl:
+
+                # we found an alternative principal;
+                # now check write access and caacl
+                altdn = alt_principal_obj['result']['dn']
+                if not ldap.can_write(altdn, "usercertificate"):
+                    raise errors.ACIError(info=_(
+                        "Insufficient privilege to create a certificate "
+                        "with subject alt name '%s'.") % name)
+                if not bypass_caacl:
                     caacl_check(principal_type, alt_principal, ca, profile_id)
+
             elif isinstance(gn, (x509.KRB5PrincipalName, x509.UPN)):
-                if gn.name != principal_string:
-                    raise errors.ACIError(
-                        info=_(
+                if not _principal_name_matches_principal(
+                        gn.name, principal_obj):
+                    raise errors.ValidationError(
+                        name='csr',
+                        error=_(
                             "Principal '%s' in subject alt name does not "
                             "match requested principal") % gn.name)
             elif isinstance(gn, cryptography.x509.general_name.RFC822Name):
@@ -784,6 +797,62 @@ def execute(self, csr, all=False, raw=False, **kw):
         )
 
 
+def _dns_name_matches_principal(name, principal, principal_obj):
+    """
+    Ensure that a DNS name matches the given principal.
+
+    :param name: The DNS name to match
+    :param principal: The subject ``Principal``
+    :param principal_obj: The subject principal's LDAP object
+    :return: True if name matches, otherwise False
+
+    """
+    for alias in _collect_principal_aliases(principal_obj):
+        # we can only compare them if both subject principal and
+        # the alias are service or host principals
+        if not (alias.is_service and principal.is_service):
+            continue
+
+        # ignore aliases with different realm or service name from
+        # subject principal
+        if alias.realm != principal.realm:
+            continue
+        if alias.service_name != principal.service_name:
+            continue
+
+        # now compare DNS name to alias hostname
+        if name.lower() == alias.hostname.lower():
+            return True  # we have a match
+
+    return False
+
+
+def _principal_name_matches_principal(name, principal_obj):
+    """
+    Ensure that a stringy principal name (e.g. from UPN
+    or KRB5PrincipalName OtherName) matches the given principal.
+
+    """
+    try:
+        principal = kerberos.Principal(name)
+    except ValueError:
+        return False
+
+    return principal in _collect_principal_aliases(principal_obj)
+
+
+def _collect_principal_aliases(principal_obj):
+    """
+    Return all Kerberos ``Principal`` aliases for given principal
+    (including the canonical name).
+
+    """
+    aliases = set(principal_obj.get('krbprincipalname', []))
+    for alias in principal_obj.get('krbcanonicalname', []):
+        aliases.add(alias)
+    return aliases
+
+
 @register()
 class cert_status(Retrieve, BaseCertMethod, VirtualCommand):
     __doc__ = _('Check the status of a certificate signing request.')
diff --git a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
index a5cc3ac..a2859f5 100644
--- a/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
+++ b/ipatests/test_xmlrpc/test_caacl_profile_enforcement.py
@@ -437,7 +437,7 @@ def santest_csr(request, santest_host_1, santest_host_2):
     return unicode(csr)
 
 
-class CAACLEnforcementOnCertBase(XMLRPC_test):
+class SubjectAltNameOneServiceBase(XMLRPC_test):
     """Base setup class for tests with SAN in CSR
 
     The class prepares an environment for test cases based
@@ -445,12 +445,10 @@ class CAACLEnforcementOnCertBase(XMLRPC_test):
 
     The class creates following entries:
 
-        * two host entries
+        * host entry
             * santest-host-1
-            * santest-host-2
-        * two service entries
+        * service entry
             * srv/santest-host-1
-            * srv/santest-host-2
         * Sub CA
             * default-profile-subca
 
@@ -467,12 +465,9 @@ class CAACLEnforcementOnCertBase(XMLRPC_test):
             * default-profile-subca -- CA
             * caIPAServiceCert      -- profile
     """
-
-    def test_prepare_caacl_hosts(self, santest_subca_acl,
-                                 santest_host_1, santest_host_2):
+    def test_prepare_caacl_hosts(self, santest_subca_acl, santest_host_1):
         santest_subca_acl.ensure_exists()
         santest_host_1.ensure_exists()
-        santest_host_2.ensure_exists()
         santest_subca_acl.add_host(santest_host_1.name)
 
     def test_prepare_caacl_CA(self, santest_subca_acl, santest_subca):
@@ -483,12 +478,79 @@ def test_prepare_caacl_profile(self, santest_subca_acl):
         santest_subca_acl.add_profile(u'caIPAserviceCert')
 
     def test_prepare_caacl_services(self, santest_subca_acl,
-                                    santest_service_host_1,
-                                    santest_service_host_2):
+                                    santest_service_host_1):
         santest_service_host_1.ensure_exists()
+        santest_subca_acl.add_service(santest_service_host_1.name)
+
+
+class CAACLEnforcementOnCertBase(SubjectAltNameOneServiceBase):
+    """
+    Base setup class for tests with SAN in CSR, where
+    multiple hosts and services are in play.
+
+    In addition to the host and service created in the base class,
+    this class adds the following entries to the environment:
+
+        * host entry
+            * santest-host-2
+        * service entry
+            * srv/santest-host-2
+
+    """
+    def test_prepare_add_host_2(self, santest_host_2, santest_service_host_2):
+        santest_host_2.ensure_exists()
         santest_service_host_2.ensure_exists()
 
-        santest_subca_acl.add_service(santest_service_host_1.name)
+
+@pytest.mark.tier1
+class TestNoMatchForSubjectAltNameDnsName(SubjectAltNameOneServiceBase):
+    """Sign certificate request with an invalid SAN dnsName.
+
+    The CSR includes a DNS name that does not correspond to a
+    principal alias or alternative principal.
+
+    """
+    def test_request_cert_with_not_allowed_SAN(
+            self, santest_subca, santest_host_1,
+            santest_service_host_1, santest_csr):
+
+        with host_keytab(santest_host_1.name) as keytab_filename:
+            with change_principal(santest_host_1.attrs['krbcanonicalname'][0],
+                                  keytab=keytab_filename):
+                with pytest.raises(errors.NotFound):
+                    api.Command.cert_request(
+                        santest_csr,
+                        principal=santest_service_host_1.name,
+                        cacn=santest_subca.name
+                    )
+
+
+@pytest.mark.tier1
+class TestPrincipalAliasForSubjectAltNameDnsName(SubjectAltNameOneServiceBase):
+    """Sign certificate request with an invalid SAN dnsName.
+
+    The CSR includes a DNS name that shall correspond to a Kerberos
+    alias of the service.  Request should succeed.
+
+    """
+    def test_add_principal_alias(
+            self, santest_service_host_1):
+        api.Command.service_add_principal(
+            santest_service_host_1.name,
+            'srv/santest-host-2.ipa.local')
+
+    def test_request_cert_with_SAN_matching_principal_alias(
+            self, santest_subca, santest_host_1,
+            santest_service_host_1, santest_csr):
+        with host_keytab(santest_host_1.name) as keytab_filename:
+            with change_principal(
+                    santest_host_1.attrs['krbcanonicalname'][0],
+                    keytab=keytab_filename):
+                api.Command.cert_request(
+                    santest_csr,
+                    principal=santest_service_host_1.name,
+                    cacn=santest_subca.name
+                )
 
 
 @pytest.mark.tier1
-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to