URL: https://github.com/freeipa/freeipa/pull/177
Author: frasertweedale
 Title: #177: Add options to write lightweight CA cert or chain to file
Action: opened

PR body:
"""
Administrators need a way to retrieve the certificate or certificate
chain of an IPA-managed lightweight CA.  Add params to the `ca'
object for carrying the CA certificate and chain (as multiple DER
values), and add the `--certificate-out' option and `--chain' flag
as client-side options for writing one or the other to a file.

Fixes: https://fedorahosted.org/freeipa/ticket/6178
"""

To pull the PR as Git branch:
git remote add ghfreeipa https://github.com/freeipa/freeipa
git fetch ghfreeipa pull/177/head:pr177
git checkout pr177
From 7b9618966398748a448af51b42c86826d8e73a06 Mon Sep 17 00:00:00 2001
From: Fraser Tweedale <ftwee...@redhat.com>
Date: Tue, 16 Aug 2016 13:16:58 +1000
Subject: [PATCH 1/2] Add function for extracting PEM certs from PKCS #7

Add a single function for extracting X.509 certs in PEM format from
a PKCS #7 object.  Refactor sites that execute ``openssl pkcs7`` to
use the new function.

Part of: https://fedorahosted.org/freeipa/ticket/6178
---
 ipalib/x509.py                  | 23 +++++++++++++++++-
 ipapython/certdb.py             |  9 ++-----
 ipaserver/install/cainstance.py | 52 +++++++++++++++--------------------------
 3 files changed, 43 insertions(+), 41 deletions(-)

diff --git a/ipalib/x509.py b/ipalib/x509.py
index e986a97..0461553 100644
--- a/ipalib/x509.py
+++ b/ipalib/x509.py
@@ -51,11 +51,14 @@
 from ipalib import errors
 from ipaplatform.paths import paths
 from ipapython.dn import DN
+from ipapython import ipautil
 
 PEM = 0
 DER = 1
 
-PEM_REGEX = re.compile(r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)', re.DOTALL)
+PEM_REGEX = re.compile(
+    r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
+    re.DOTALL)
 
 EKU_SERVER_AUTH = '1.3.6.1.5.5.7.3.1'
 EKU_CLIENT_AUTH = '1.3.6.1.5.5.7.3.2'
@@ -148,6 +151,24 @@ def load_certificate_list(data, dbdir=None):
     certs = [load_certificate(cert, PEM, dbdir) for cert in certs]
     return certs
 
+
+def pkcs7_to_pems(data, datatype=PEM):
+    """
+    Extract certificates from a PKCS #7 object.
+
+    Return a ``list`` of X.509 PEM strings.
+
+    May throw ``ipautil.CalledProcessError`` on invalid data.
+
+    """
+    cmd = [
+        paths.OPENSSL, "pkcs7", "-print_certs",
+        "-inform", "PEM" if datatype == PEM else "DER",
+    ]
+    result = ipautil.run(cmd, stdin=data, capture_output=True)
+    return PEM_REGEX.findall(result.output)
+
+
 def load_certificate_list_from_file(filename, dbdir=None):
     """
     Load a certificate list from a PEM file.
diff --git a/ipapython/certdb.py b/ipapython/certdb.py
index 06666c0..49c2613 100644
--- a/ipapython/certdb.py
+++ b/ipapython/certdb.py
@@ -270,13 +270,8 @@ def import_files(self, files, db_password_filename, import_keys=False,
                             continue
 
                     if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'):
-                        args = [
-                            paths.OPENSSL, 'pkcs7',
-                            '-print_certs',
-                        ]
                         try:
-                            result = ipautil.run(
-                                args, stdin=body, capture_output=True)
+                            certs = x509.pkcs7_to_pems(body)
                         except ipautil.CalledProcessError as e:
                             if label == 'CERTIFICATE':
                                 root_logger.warning(
@@ -288,7 +283,7 @@ def import_files(self, files, db_password_filename, import_keys=False,
                                     filename, line, e)
                             continue
                         else:
-                            extracted_certs += result.output + '\n'
+                            extracted_certs += '\n'.join(certs) + '\n'
                             loaded = True
                             continue
 
diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py
index 384abc3..7d6a956 100644
--- a/ipaserver/install/cainstance.py
+++ b/ipaserver/install/cainstance.py
@@ -847,44 +847,30 @@ def __import_ca_chain(self):
         # makes openssl throw up.
         data = base64.b64decode(chain)
 
-        result = ipautil.run(
-            [paths.OPENSSL,
-             "pkcs7",
-             "-inform",
-             "DER",
-             "-print_certs",
-             ], stdin=data, capture_output=True)
-        certlist = result.output
+        certlist = x509.pkcs7_to_pems(data, x509.DER)
 
         # Ok, now we have all the certificates in certs, walk through it
         # and pull out each certificate and add it to our database
 
-        st = 1
-        en = 0
-        subid = 0
         ca_dn = DN(('CN','Certificate Authority'), self.subject_base)
-        while st > 0:
-            st = certlist.find('-----BEGIN', en)
-            en = certlist.find('-----END', en+1)
-            if st > 0:
-                try:
-                    (chain_fd, chain_name) = tempfile.mkstemp()
-                    os.write(chain_fd, certlist[st:en+25])
-                    os.close(chain_fd)
-                    (_rdn, subject_dn) = certs.get_cert_nickname(certlist[st:en+25])
-                    if subject_dn == ca_dn:
-                        nick = get_ca_nickname(self.realm)
-                        trust_flags = 'CT,C,C'
-                    else:
-                        nick = str(subject_dn)
-                        trust_flags = ',,'
-                    self.__run_certutil(
-                        ['-A', '-t', trust_flags, '-n', nick, '-a',
-                         '-i', chain_name]
-                    )
-                finally:
-                    os.remove(chain_name)
-                    subid += 1
+        for cert in certlist:
+            try:
+                (chain_fd, chain_name) = tempfile.mkstemp()
+                os.write(chain_fd, cert)
+                os.close(chain_fd)
+                (_rdn, subject_dn) = certs.get_cert_nickname(cert)
+                if subject_dn == ca_dn:
+                    nick = get_ca_nickname(self.realm)
+                    trust_flags = 'CT,C,C'
+                else:
+                    nick = str(subject_dn)
+                    trust_flags = ',,'
+                self.__run_certutil(
+                    ['-A', '-t', trust_flags, '-n', nick, '-a',
+                     '-i', chain_name]
+                )
+            finally:
+                os.remove(chain_name)
 
         # Restore NSS trust flags of all previously existing certificates
         for nick, trust_flags in cert_backup_list:

From 0804716a825d5c9e2d3fc69f5961c63b962b239e Mon Sep 17 00:00:00 2001
From: Fraser Tweedale <ftwee...@redhat.com>
Date: Mon, 8 Aug 2016 14:27:20 +1000
Subject: [PATCH 2/2] Add options to write lightweight CA cert or chain to file

Administrators need a way to retrieve the certificate or certificate
chain of an IPA-managed lightweight CA.  Add params to the `ca'
object for carrying the CA certificate and chain (as multiple DER
values), and add the `--certificate-out' option and `--chain' flag
as client-side options for writing one or the other to a file.

Fixes: https://fedorahosted.org/freeipa/ticket/6178
---
 ipaclient/plugins/ca.py     | 61 ++++++++++++++++++++++++++++++++++++++++++
 ipaserver/plugins/ca.py     | 65 ++++++++++++++++++++++++++++++++++++++++-----
 ipaserver/plugins/dogtag.py | 12 +++++++++
 3 files changed, 131 insertions(+), 7 deletions(-)
 create mode 100644 ipaclient/plugins/ca.py

diff --git a/ipaclient/plugins/ca.py b/ipaclient/plugins/ca.py
new file mode 100644
index 0000000..bd1d2b5
--- /dev/null
+++ b/ipaclient/plugins/ca.py
@@ -0,0 +1,61 @@
+#
+# Copyright (C) 2016  FreeIPA Contributors see COPYING for license
+#
+
+import base64
+from ipaclient.frontend import MethodOverride
+from ipalib import util, x509, Flag, Str
+from ipalib.plugable import Registry
+from ipalib.text import _
+
+register = Registry()
+
+
+class WithCertOutArgs(MethodOverride):
+
+    takes_options = (
+        Str(
+            'certificate_out?',
+            doc=_('Write certificate to file'),
+            include='cli',
+        ),
+        Flag(
+            'chain',
+            default=False,
+            doc=_('Write certificate chain instead of single certificate'),
+            include='cli',
+        ),
+    )
+
+    def forward(self, *keys, **options):
+        filename = None
+        if 'certificate_out' in options:
+            filename = options.pop('certificate_out')
+            util.check_writable_file(filename)
+        chain = options.pop('chain', False)
+        if chain:
+            options['all'] = True
+
+        result = super(WithCertOutArgs, self).forward(*keys, **options)
+        if filename:
+            def to_pem(x):
+                return x509.make_pem(x)
+            if chain:
+                ders = result['result']['certificate_chain']
+                data = '\n'.join(to_pem(base64.b64encode(der)) for der in ders)
+            else:
+                data = to_pem(result['result']['certificate'])
+            with open(filename, 'wb') as f:
+                f.write(data)
+
+        return result
+
+
+@register(override=True, no_fail=True)
+class ca_add(WithCertOutArgs):
+    pass
+
+
+@register(override=True, no_fail=True)
+class ca_show(WithCertOutArgs):
+    pass
diff --git a/ipaserver/plugins/ca.py b/ipaserver/plugins/ca.py
index d9ae8c8..89fe88c 100644
--- a/ipaserver/plugins/ca.py
+++ b/ipaserver/plugins/ca.py
@@ -2,14 +2,21 @@
 # Copyright (C) 2016  FreeIPA Contributors see COPYING for license
 #
 
-from ipalib import api, errors, output, DNParam, Str
+import base64
+
+import six
+
+from ipalib import api, errors, output, Bytes, DNParam, Str
 from ipalib.constants import IPA_CA_CN
 from ipalib.plugable import Registry
 from ipaserver.plugins.baseldap import (
     LDAPObject, LDAPSearch, LDAPCreate, LDAPDelete,
     LDAPUpdate, LDAPRetrieve, LDAPQuery, pkey_to_value)
-from ipaserver.plugins.cert import ca_enabled_check
-from ipalib import _, ngettext
+from ipaserver.plugins.cert import BaseCertObject, ca_enabled_check
+from ipalib import _, ngettext, x509
+
+if six.PY3:
+    unicode = str
 
 
 __doc__ = _("""
@@ -53,7 +60,7 @@
 
 
 @register()
-class ca(LDAPObject):
+class ca(LDAPObject, BaseCertObject):
     """
     Lightweight CA Object
     """
@@ -70,7 +77,12 @@ class ca(LDAPObject):
     label = _('Certificate Authorities')
     label_singular = _('Certificate Authority')
 
-    takes_params = (
+    takes_params = tuple(
+        # filter out attrs with an equivalent defined as part of
+        # this LDAP object
+        option for option in BaseCertObject.takes_params
+        if option.name not in {'cacn', 'subject', 'issuer'}
+    ) + (
         Str('cn',
             primary_key=True,
             cli_name='name',
@@ -100,6 +112,12 @@ class ca(LDAPObject):
             doc=_('Issuer Distinguished Name'),
             flags=['no_create', 'no_update'],
         ),
+        Bytes(
+            'certificate_chain*',
+            label=_("Certificate chain"),
+            doc=_("X.509 certificate chain"),
+            flags={'no_create', 'no_update', 'no_search'},
+        ),
     )
 
     permission_filter_objectclasses = ['ipaca']
@@ -144,6 +162,33 @@ class ca(LDAPObject):
         },
     }
 
+    def _parse(self, obj, full=True):
+        """
+        Remove fields defined in ``BaseCertObject`` that are not
+        used in the ``ca`` object.
+
+        """
+        super(ca, self)._parse(obj, full)
+        for k in {'issuer', 'subject'}:
+            if k in obj:
+                del obj[k]
+
+
+def set_certificate_attrs(entry, options):
+    ca_id = entry['ipacaid'][0]
+    full = options.get('all', False)
+    with api.Backend.ra_lightweight_ca as ca_api:
+        der = ca_api.read_ca_cert(entry['ipacaid'][0])
+        entry['certificate'] = unicode(base64.b64encode(der))
+        if not options.get('raw', False):
+            api.Object.ca._parse(entry, full)
+
+        if full:
+            pkcs7_der = ca_api.read_ca_chain(ca_id)
+            pems = x509.pkcs7_to_pems(pkcs7_der, x509.DER)
+            ders = [x509.normalize_certificate(pem) for pem in pems]
+            entry['certificate_chain'] = ders
+
 
 @register()
 class ca_find(LDAPSearch):
@@ -161,9 +206,11 @@ def execute(self, *keys, **options):
 class ca_show(LDAPRetrieve):
     __doc__ = _("Display the properties of a CA.")
 
-    def execute(self, *args, **kwargs):
+    def execute(self, *keys, **options):
         ca_enabled_check()
-        return super(ca_show, self).execute(*args, **kwargs)
+        result = super(ca_show, self).execute(*keys, **options)
+        set_certificate_attrs(result['result'], options)
+        return result
 
 
 @register()
@@ -203,6 +250,10 @@ def pre_callback(self, ldap, dn, entry, entry_attrs, *keys, **options):
         entry['ipacasubjectdn'] = [resp['dn']]
         return dn
 
+    def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+        set_certificate_attrs(entry_attrs, options)
+        return dn
+
 
 @register()
 class ca_del(LDAPDelete):
diff --git a/ipaserver/plugins/dogtag.py b/ipaserver/plugins/dogtag.py
index 0bdb4da..9bcda45 100644
--- a/ipaserver/plugins/dogtag.py
+++ b/ipaserver/plugins/dogtag.py
@@ -2125,6 +2125,18 @@ def read_ca(self, ca_id):
         except:
             raise errors.RemoteRetrieveError(reason=_("Response from CA was not valid JSON"))
 
+    def read_ca_cert(self, ca_id):
+        status, resp_headers, resp_body = self._ssldo(
+            'GET', '{}/cert'.format(ca_id),
+            headers={'Accept': 'application/pkix-cert'})
+        return resp_body
+
+    def read_ca_chain(self, ca_id):
+        status, resp_headers, resp_body = self._ssldo(
+            'GET', '{}/chain'.format(ca_id),
+            headers={'Accept': 'application/pkcs7-mime'})
+        return resp_body
+
     def disable_ca(self, ca_id):
         self._ssldo(
             'POST', ca_id + '/disable',
-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to