On 07/02/2015 11:12 AM, Martin Babinsky wrote:
On 07/01/2015 03:05 PM, Martin Babinsky wrote:
On 06/30/2015 02:45 PM, Martin Babinsky wrote:
On 06/30/2015 01:11 PM, Martin Babinsky wrote:
On 06/30/2015 12:04 PM, Jan Cholasta wrote:
Dne 29.6.2015 v 10:36 Martin Babinsky napsal(a):
On 06/23/2015 01:49 PM, Martin Babinsky wrote:
This patchset implements new API commands for manipulating
user/host/service userCertificate attribute alongside some
underlying
plumbing.

PATCH 0045 is a small test suite that I slapped together since
manual
testing of this stuff is very cumbersome. It requires my PATCH
0040 to
apply and work which was pushed to master recently
(commit 74883bbc959058c8bfafd9f63e8fad0e3792ac28).

The work is related to
http://www.freeipa.org/page/V4/User_Certificates
and https://fedorahosted.org/freeipa/ticket/4238



Attaching updated patches.

Here are some notes for Jan because I did some things differently
than
we agreed on during review:


1.) I chose not to rename 'usercertificate' to
'usercertificate;binary'
and back in pre/post callbacks. Despite the fact that the correct
way to
name the certificate attribute is 'usercertificate;binary', I feel
that
suddenly renaming it in the new code is asking for trouble.

New code is new, there is no renaming, there is naming, and that
naming
should follow standards, and the standard is userCertificate;binary.

(For the record I did not ask for any renaming in *old* host and
service
code.)

OK I will then use 'usercertificate;binary' and try to not break
things.

I'm all for changing the mapping between CLI options and actual
attribute names but it should be done in a systematic fashion.

+1, shall I post a patch?

That would be great, but I'm not sure if there is time for it. Maybe we
can create a ticket for tracking?

2.) I have kept the `normalize_certs` function. It has the
potential to
catch incorrectly formatted/encoded certificates and in a way
circumvents the slightly demented way the framework deals with
supposedly binary data.

One sentence above you asked for doing things in systematic fashion.
This is exactly what it isn't. A systematic solution would be a new
parameter type for certificates.

Ha I didn't notice that incorrect encoding is caught by validator.

But I think that we still need to catch malformed certificates that can
not be decoded to DER and AFAIK we don't do that anywhere (failing
tests
when adding a random Base64-encoded string confirm this).

All this probably stems from my confusion about the way IPA framework
guesses binary data. For example, if I call `api.Command.user_add_cert`
and fill 'certificate' option with Base64 blob reencoded to Unicode,
everything works as expected.

However, filling this option with 'str' leads to another round of
Base64
encoding in the framework, leading to 'userCertificate;binary' which is
filled by original Base64 blob instead of DER encoded cert.


I have also added two negative test cases which deal with incorrectly
encoded and formatted certificates.





Attaching updated patches (actually only 44 is updated, I added the
rename to/from 'usercertificate;binary' to user pre/post callbacks).



Another patch update attached (mainly fixing pep8 complaints and
reworking certificate validation).




Updated patches attached.




I left a a bug in PATCH 0043. Attaching updated version.

--
Martin^3 Babinsky
From 367f0791f6ec768375a0b16fe98ece8ec94c9e1a Mon Sep 17 00:00:00 2001
From: Martin Babinsky <mbabi...@redhat.com>
Date: Tue, 23 Jun 2015 13:40:30 +0200
Subject: [PATCH 2/4] reworked certificate normalization and revocation

Validation of certificate is now handled by `x509.validate_certificate'.

Revocation of the host and service certificates was factored out to a separate
function.

Part of http://www.freeipa.org/page/V4/User_Certificates
---
 ipalib/plugins/host.py    |  75 +++-------------------------------
 ipalib/plugins/service.py | 102 +++++++++++++---------------------------------
 ipalib/x509.py            |  14 +++++--
 3 files changed, 45 insertions(+), 146 deletions(-)

diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py
index e81dca94e124b080a3d68a3b1cfd079710e30336..f5871f81e49c0bf49fd2cba14fdde7d1b570e462 100644
--- a/ipalib/plugins/host.py
+++ b/ipalib/plugins/host.py
@@ -786,30 +786,8 @@ class host_del(LDAPDelete):
                 entry_attrs = ldap.get_entry(dn, ['usercertificate'])
             except errors.NotFound:
                 self.obj.handle_not_found(*keys)
-            for cert in entry_attrs.get('usercertificate', []):
-                cert = x509.normalize_certificate(cert)
-                try:
-                    serial = unicode(x509.get_serial_number(cert, x509.DER))
-                    try:
-                        result = api.Command['cert_show'](serial)['result']
-                        if 'revocation_reason' not in result:
-                            try:
-                                api.Command['cert_revoke'](serial,
-                                                           revocation_reason=4)
-                            except errors.NotImplementedError:
-                                # some CA's might not implement revoke
-                                pass
-                    except errors.NotImplementedError:
-                        # some CA's might not implement revoke
-                        pass
-                except NSPRError, nsprerr:
-                    if nsprerr.errno == -8183:
-                        # If we can't decode the cert them proceed with
-                        # removing the host.
-                        self.log.info("Problem decoding certificate %s" %
-                                      nsprerr.args[1])
-                    else:
-                        raise nsprerr
+
+            revoke_certs(entry_attrs.get('usercertificate', []), self.log)
 
         return dn
 
@@ -879,29 +857,8 @@ class host_mod(LDAPUpdate):
             old_certs = entry_attrs_old.get('usercertificate', [])
             old_certs_der = map(x509.normalize_certificate, old_certs)
             removed_certs_der = set(old_certs_der) - set(certs_der)
-            for cert in removed_certs_der:
-                try:
-                    serial = unicode(x509.get_serial_number(cert, x509.DER))
-                    try:
-                        result = api.Command['cert_show'](serial)['result']
-                        if 'revocation_reason' not in result:
-                            try:
-                                api.Command['cert_revoke'](
-                                    serial, revocation_reason=4)
-                            except errors.NotImplementedError:
-                                # some CA's might not implement revoke
-                                pass
-                    except errors.NotImplementedError:
-                        # some CA's might not implement revoke
-                        pass
-                except NSPRError, nsprerr:
-                    if nsprerr.errno == -8183:
-                        # If we can't decode the cert them proceed with
-                        # modifying the host.
-                        self.log.info("Problem decoding certificate %s" %
-                                      nsprerr.args[1])
-                    else:
-                        raise nsprerr
+            revoke_certs(removed_certs_der, self.log)
+
         if certs:
             entry_attrs['usercertificate'] = certs_der
 
@@ -1163,31 +1120,9 @@ class host_disable(LDAPQuery):
             self.obj.handle_not_found(*keys)
         if self.api.Command.ca_is_enabled()['result']:
             certs = entry_attrs.get('usercertificate', [])
-            for cert in map(x509.normalize_certificate, certs):
-                try:
-                    serial = unicode(x509.get_serial_number(cert, x509.DER))
-                    try:
-                        result = api.Command['cert_show'](serial)['result']
-                        if 'revocation_reason' not in result:
-                            try:
-                                api.Command['cert_revoke'](serial,
-                                                           revocation_reason=4)
-                            except errors.NotImplementedError:
-                                # some CA's might not implement revoke
-                                pass
-                    except errors.NotImplementedError:
-                        # some CA's might not implement revoke
-                        pass
-                except NSPRError, nsprerr:
-                    if nsprerr.errno == -8183:
-                        # If we can't decode the cert them proceed with
-                        # disabling the host.
-                        self.log.info("Problem decoding certificate %s" %
-                                      nsprerr.args[1])
-                    else:
-                        raise nsprerr
 
             if certs:
+                revoke_certs(certs, self.log)
                 # Remove the usercertificate altogether
                 entry_attrs['usercertificate'] = None
                 ldap.update_entry(entry_attrs)
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 166d978a248e7c5da6f8df4b534edad0a0799b7e..08606eddcea7c34198742c6fda76ff63b65818ab 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -238,16 +238,32 @@ def normalize_principal(principal):
 
 def validate_certificate(ugettext, cert):
     """
-    For now just verify that it is properly base64-encoded.
+    Check whether the certificate is properly encoded to DER
     """
-    if cert and util.isvalid_base64(cert):
+    x509.validate_certificate(cert, datatype=x509.DER)
+
+
+def revoke_certs(certs, logger=None):
+    """
+    revoke the certificates removed from host/service entry
+    """
+    for cert in [x509.normalize_certificate(i) for i in certs]:
         try:
-            base64.b64decode(cert)
-        except Exception, e:
-            raise errors.Base64DecodeError(reason=str(e))
-    else:
-        # We'll assume this is DER data
-        pass
+            x509.validate_certificate(cert, x509.DER)
+            serial = unicode(x509.get_serial_number(cert, x509.DER))
+
+            result = api.Command['cert_show'](unicode(serial))['result']
+            if 'revocation_reason' not in result:
+                api.Command['cert_revoke'](unicode(serial),
+                                           revocation_reason=4)
+
+        except errors.NotImplementedError:
+            # some CA's might not implement revoke
+            pass
+        except errors.CertificateFormatError as e:
+            if logger is not None:
+                logger.info("Problem decoding certificate: %s" % e)
+
 
 def set_certificate_attrs(entry_attrs):
     """
@@ -566,27 +582,8 @@ class service_del(LDAPDelete):
                 entry_attrs = ldap.get_entry(dn, ['usercertificate'])
             except errors.NotFound:
                 self.obj.handle_not_found(*keys)
-            for cert in entry_attrs.get('usercertificate', []):
-                try:
-                    serial = unicode(x509.get_serial_number(cert, x509.DER))
-                    try:
-                        result = api.Command['cert_show'](unicode(serial))['result']
-                        if 'revocation_reason' not in result:
-                            try:
-                                api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
-                            except errors.NotImplementedError:
-                                # some CA's might not implement revoke
-                                pass
-                    except errors.NotImplementedError:
-                        # some CA's might not implement revoke
-                        pass
-                except NSPRError, nsprerr:
-                    if nsprerr.errno == -8183:
-                        # If we can't decode the cert them proceed with
-                        # removing the service.
-                        self.log.info("Problem decoding certificate %s" % nsprerr.args[1])
-                    else:
-                        raise nsprerr
+            revoke_certs(entry_attrs.get('usercertificate', []), self.log)
+
         return dn
 
 
@@ -622,29 +619,8 @@ class service_mod(LDAPUpdate):
             old_certs = entry_attrs_old.get('usercertificate', [])
             old_certs_der = map(x509.normalize_certificate, old_certs)
             removed_certs_der = set(old_certs_der) - set(certs_der)
-            for cert in removed_certs_der:
-                try:
-                    serial = unicode(x509.get_serial_number(cert, x509.DER))
-                    try:
-                        result = api.Command['cert_show'](serial)['result']
-                        if 'revocation_reason' not in result:
-                            try:
-                                api.Command['cert_revoke'](
-                                    serial, revocation_reason=4)
-                            except errors.NotImplementedError:
-                                # some CA's might not implement revoke
-                                pass
-                    except errors.NotImplementedError:
-                        # some CA's might not implement revoke
-                        pass
-                except NSPRError, nsprerr:
-                    if nsprerr.errno == -8183:
-                        # If we can't decode the cert them proceed with
-                        # modifying the host.
-                        self.log.info("Problem decoding certificate %s" %
-                                      nsprerr.args[1])
-                    else:
-                        raise nsprerr
+            revoke_certs(removed_certs_der, self.log)
+
         if certs:
             entry_attrs['usercertificate'] = certs_der
 
@@ -854,29 +830,9 @@ class service_disable(LDAPQuery):
 
         if self.api.Command.ca_is_enabled()['result']:
             certs = entry_attrs.get('usercertificate', [])
-            for cert in map(x509.normalize_certificate, certs):
-                try:
-                    serial = unicode(x509.get_serial_number(cert, x509.DER))
-                    try:
-                        result = api.Command['cert_show'](unicode(serial))['result']
-                        if 'revocation_reason' not in result:
-                            try:
-                                api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
-                            except errors.NotImplementedError:
-                                # some CA's might not implement revoke
-                                pass
-                    except errors.NotImplementedError:
-                        # some CA's might not implement revoke
-                        pass
-                except NSPRError, nsprerr:
-                    if nsprerr.errno == -8183:
-                        # If we can't decode the cert them proceed with
-                        # disabling the service
-                        self.log.info("Problem decoding certificate %s" % nsprerr.args[1])
-                    else:
-                        raise nsprerr
 
             if len(certs) > 0:
+                revoke_certs(certs, self.log)
                 # Remove the usercertificate altogether
                 entry_attrs['usercertificate'] = None
                 ldap.update_entry(entry_attrs)
diff --git a/ipalib/x509.py b/ipalib/x509.py
index a87dbf4130c60b1b1daf8bbb2ffb81c208f2529c..edd73ebdc3b3732d326cd8f414bc957f1e4deb87 100644
--- a/ipalib/x509.py
+++ b/ipalib/x509.py
@@ -294,16 +294,24 @@ def normalize_certificate(rawcert):
     # was base64-encoded and now its not or it came in as DER format.
     # Let's decode it and see. Fetching the serial number will pass the
     # certificate through the NSS DER parser.
+    validate_certificate(dercert, datatype=DER)
+
+    return dercert
+
+
+def validate_certificate(cert, datatype=PEM, dbdir=None):
+    """
+    Perform certificate validation by trying to load it into NSS database
+    """
     try:
-        serial = unicode(get_serial_number(dercert, DER))
-    except NSPRError, nsprerr:
+        load_certificate(cert, datatype=datatype, dbdir=dbdir)
+    except NSPRError as nsprerr:
         if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
             raise errors.CertificateFormatError(
                 error=_('improperly formatted DER-encoded certificate'))
         else:
             raise errors.CertificateFormatError(error=str(nsprerr))
 
-    return dercert
 
 def write_certificate(rawcert, filename):
     """
-- 
2.4.3

-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to