On 17.1.2014 11:39, Jan Cholasta wrote:
On 10.1.2014 13:34, Martin Kosek wrote:
On 01/09/2014 04:49 PM, Simo Sorce wrote:
On Thu, 2014-01-09 at 10:44 -0500, Rob Crittenden wrote:
Martin Kosek wrote:
On 01/09/2014 03:12 PM, Simo Sorce wrote:

Also maybe we should allow admins to bypass the need to have an
actual
object to represent the alt name ?

I'd rather not. This would allow a rogue admin to create a cert for
www.google.com. Sure, they could also create a host for that but
forcing
them to add more entries increases the chances of them getting caught
doing it.

They can remove the host right after they create a cert, I honestly do
not think this is a valid concern. If your admin is rouge he can already
take full ownership of your infrastructure in many ways, preventing
setting a name in a cert doesn't really make a difference IMO.

However I would be ok to limit this to some new "Security Admin/CA
Admin" role that is not assigned by default.

Simo.


Ok, let's reach some conclusion here. I would really like to not defer
this
feature for too long, it is quite wanted. Would creating new virtual
operation
"Request certificate with SAN" make the situation better? It would not
be so
difficult to do, the check_access function can already access virtual
operation
name as a parameter, we just need to call it.

Why don't we treat SAN hostnames the same way as the subject hostname?
The way I see it, with SAN the only difference is that there is a set of
hostnames instead of just a single hostname, so maybe we should support
requesting a certificate for a set of hosts/services instead of just a
single host/service.

As far as authorization is concerned, currently you can request a
certificate for a single host/service, if you have the "Request
certificate" permission and write access to the host/service entry. With
multiple hosts/services, you would be able to request a certificate if
you have the "Request certificate" permission and write access to *all*
of the host/certificate entries you are requesting the certificate for.

Effectively this means that cert-request would accept multiple
principals instead of single principal and the automatic revocation code
in cert-request, host-del and service-del would take into account that a
single certificate might be assigned to multiple entities.


See attachment for patch which implements the above design.

--
Jan Cholasta
>From bb95b3afd6786adc04aa4cd5ac114fbace964907 Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Mon, 20 Jan 2014 10:59:08 +0100
Subject: [PATCH] Support SAN in cert-request.

https://fedorahosted.org/freeipa/ticket/3977
---
 API.txt                |   2 +-
 VERSION                |   3 +-
 ipalib/plugins/cert.py | 191 +++++++++++++++++++++++++++----------------------
 3 files changed, 107 insertions(+), 89 deletions(-)

diff --git a/API.txt b/API.txt
index a6c3aed..015d829 100644
--- a/API.txt
+++ b/API.txt
@@ -479,7 +479,7 @@ command: cert_request
 args: 1,4,1
 arg: File('csr', cli_name='csr_file')
 option: Flag('add', autofill=True, default=False)
-option: Str('principal')
+option: Str('principal+')
 option: Str('request_type', autofill=True, default=u'pkcs10')
 option: Str('version?', exclude='webui')
 output: Output('result', <type 'dict'>, None)
diff --git a/VERSION b/VERSION
index 5ce16b5..c9f06d1 100644
--- a/VERSION
+++ b/VERSION
@@ -89,4 +89,5 @@ IPA_DATA_VERSION=20100614120000
 #                                                      #
 ########################################################
 IPA_API_VERSION_MAJOR=2
-IPA_API_VERSION_MINOR=72
+IPA_API_VERSION_MINOR=73
+# Last change: jcholast - SAN certificate requests
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 762f13b..d343f99 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -249,7 +249,7 @@ class cert_request(VirtualCommand):
     operation="request certificate"
 
     takes_options = (
-        Str('principal',
+        Str('principal+',
             label=_('Principal'),
             doc=_('Service principal for this certificate (e.g. HTTP/test.example.com)'),
         ),
@@ -301,13 +301,7 @@ class cert_request(VirtualCommand):
         ),
     )
 
-    def execute(self, csr, **kw):
-        ldap = self.api.Backend.ldap2
-        principal = kw.get('principal')
-        add = kw.get('add')
-        request_type = kw.get('request_type')
-        service = None
-
+    def execute(self, csr, **options):
         """
         Access control is partially handled by the ACI titled
         'Hosts can modify service userCertificate'. This is for the case
@@ -324,73 +318,101 @@ class cert_request(VirtualCommand):
         if not bind_principal.startswith('host/'):
             self.check_access()
 
-        # FIXME: add support for subject alt name
-
-        # Ensure that the hostname in the CSR matches the principal
-        subject_host = get_csr_hostname(csr)
-        if not subject_host:
-            raise errors.ValidationError(name='csr',
+        request = None
+        try:
+            request = pkcs10.load_certificate_request(csr)
+            subject = pkcs10.get_subject(request)
+            subjectaltname = pkcs10.get_subjectaltname(request)
+        except NSPRError, e:
+            raise errors.CertificateOperationError(
+                error=_('Failure decoding Certificate Signing Request: %s') % e)
+        finally:
+            if request is not None:
+                del request
+
+        subjectname = subject.common_name
+        if not subjectname:
+            raise errors.ValidationError(
+                name='csr',
                 error=_("No hostname was found in subject of request."))
 
-        (servicename, hostname, realm) = split_principal(principal)
-        if subject_host.lower() != hostname.lower():
-            raise errors.ACIError(
-                info=_("hostname in subject of request '%(subject_host)s' "
-                    "does not match principal hostname '%(hostname)s'") % dict(
-                        subject_host=subject_host, hostname=hostname))
-
-        dn = None
-        service = None
-        # See if the service exists and punt if it doesn't and we aren't
-        # going to add it
-        try:
-            if not principal.startswith('host/'):
-                service = api.Command['service_show'](principal, all=True)['result']
-                dn = service['dn']
-            else:
-                hostname = get_host_from_principal(principal)
-                service = api.Command['host_show'](hostname, all=True)['result']
-                dn = service['dn']
-        except errors.NotFound, e:
-            if not add:
-                raise errors.NotFound(reason=_("The service principal for "
-                    "this request doesn't exist."))
+        if subjectaltname is None:
+            subjectaltname = ()
+
+        hostnames = []
+        servicename = None
+        for i, principal in enumerate(options['principal']):
+            (p_servicename, hostname, realm) = split_principal(principal)
+
+            # All principals must have the same service name
+            if servicename is None:
+                servicename = p_servicename
+            elif p_servicename != servicename:
+                raise errors.ValidationError(
+                    name='principal', index=i, value=principal,
+                    error=_("service name must be '%s'") % servicename)
+
+            # There must be a matching hostname in the request
+            hostname = hostname.lower()
+            if hostname != subjectname.lower():
+                for san in subjectaltname:
+                    if hostname == san.lower():
+                        break
+                else:
+                    raise errors.ValidationError(
+                        name='principal', index=i, value=principal,
+                        error=_("no matching hostname found in request"))
+
+            hostnames.append(hostname)
+
+        for hostname in (subjectname,) + subjectaltname:
+            if hostname.lower() not in hostnames:
+                if hostname == subjectname:
+                    error = _("hostname in subject of request '%s' does not "
+                              "match any principal hostname")
+                else:
+                    error = _("hostname in SAN of request '%s' does not "
+                              "match any principal hostname")
+                raise errors.ValidationError(
+                    name='csr', error=error % hostname)
+
+        ldap = api.Backend.ldap2
+        if servicename == 'host':
+            obj = api.Object.host
+            keys = hostnames
+        else:
+            obj = api.Object.service
+            keys = options['principal']
+
+        for key in keys:
+            # See if the service exists and punt if it doesn't and we aren't
+            # going to add it
             try:
-                service = api.Command['service_add'](principal, **{'force': True})['result']
-                dn = service['dn']
-            except errors.ACIError:
-                raise errors.ACIError(info=_('You need to be a member of '
-                    'the serviceadmin role to add services'))
-
-        # We got this far so the service entry exists, can we write it?
-        if not ldap.can_write(dn, "usercertificate"):
-            raise errors.ACIError(info=_("Insufficient 'write' privilege "
-                "to the 'userCertificate' attribute of entry '%s'.") % dn)
-
-        # Validate the subject alt name, if any
-        request = pkcs10.load_certificate_request(csr)
-        subjectaltname = pkcs10.get_subjectaltname(request)
-        if subjectaltname is not None:
-            for name in subjectaltname:
-                name = unicode(name)
+                service = obj.methods.show(key, all=True)['result']
+            except errors.NotFound, e:
+                if not options['add']:
+                    raise errors.NotFound(
+                        reason=_("The service principal for this request "
+                                 "doesn't exist."))
                 try:
-                    hostentry = api.Command['host_show'](name, all=True)['result']
-                    hostdn = hostentry['dn']
-                except errors.NotFound:
-                    # We don't want to issue any certificates referencing
-                    # machines we don't know about. Nothing is stored in this
-                    # host record related to this certificate.
-                    raise errors.NotFound(reason=_('no host record for '
-                        'subject alt name %s in certificate request') % name)
-                authprincipal = getattr(context, 'principal')
-                if authprincipal.startswith("host/"):
-                    if not hostdn in service.get('managedby_host', []):
-                        raise errors.ACIError(info=_(
-                            "Insufficient privilege to create a certificate "
-                            "with subject alt name '%s'.") % name)
-
-        if 'usercertificate' in service:
-            serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER)
+                    service = obj.methods.add(key, force=True)['result']
+                except errors.ACIError:
+                    raise errors.ACIError(
+                        info=_("You need to be a member of the serviceadmin "
+                               "role to add services"))
+
+            # We got this far so the service entry exists, can we write it?
+            dn = service['dn']
+            if not ldap.can_write(dn, "usercertificate"):
+                raise errors.ACIError(
+                    info=_("Insufficient 'write' privilege to the "
+                           "'userCertificate' attribute of entry '%s'.") % dn)
+
+            if 'usercertificate' not in service:
+                continue
+
+            serial = x509.get_serial_number(service['usercertificate'][0],
+                                            datatype=x509.DER)
             # revoke the certificate and remove it from the service
             # entry before proceeding. First we retrieve the certificate to
             # see if it is already revoked, if not then we revoke it.
@@ -398,38 +420,33 @@ class cert_request(VirtualCommand):
                 result = api.Command['cert_show'](unicode(serial))['result']
                 if 'revocation_reason' not in result:
                     try:
-                        api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
+                        api.Command['cert_revoke'](unicode(serial),
+                                                   revocation_reason=4)
                     except errors.NotImplementedError:
                         # some CA's might not implement revoke
                         pass
             except errors.NotImplementedError:
                 # some CA's might not implement get
                 pass
-            if not principal.startswith('host/'):
-                api.Command['service_mod'](principal, usercertificate=None)
-            else:
-                hostname = get_host_from_principal(principal)
-                api.Command['host_mod'](hostname, usercertificate=None)
+
+            obj.methods.mod(key, usercertificate=None)
 
         # Request the certificate
         result = self.Backend.ra.request_certificate(
-            csr, request_type=request_type)
+            csr, request_type=options['request_type'])
         cert = x509.load_certificate(result['certificate'])
         result['issuer'] = unicode(cert.issuer)
         result['valid_not_before'] = unicode(cert.valid_not_before_str)
         result['valid_not_after'] = unicode(cert.valid_not_after_str)
-        result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
-        result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
+        result['md5_fingerprint'] = unicode(
+            nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
+        result['sha1_fingerprint'] = unicode(
+            nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
 
         # Success? Then add it to the service entry.
         if 'certificate' in result:
-            if not principal.startswith('host/'):
-                skw = {"usercertificate": str(result.get('certificate'))}
-                api.Command['service_mod'](principal, **skw)
-            else:
-                hostname = get_host_from_principal(principal)
-                skw = {"usercertificate": str(result.get('certificate'))}
-                api.Command['host_mod'](hostname, **skw)
+            for key in keys:
+                obj.methods.mod(key, usercertificate=str(result['certificate']))
 
         return dict(
             result=result
-- 
1.8.4.2

_______________________________________________
Freeipa-devel mailing list
Freeipa-devel@redhat.com
https://www.redhat.com/mailman/listinfo/freeipa-devel

Reply via email to