URL: https://github.com/freeipa/freeipa/pull/177
Author: frasertweedale
 Title: #177: Add options to write lightweight CA cert or chain to file
Action: synchronized

To pull the PR as Git branch:
git remote add ghfreeipa https://github.com/freeipa/freeipa
git fetch ghfreeipa pull/177/head:pr177
git checkout pr177
From 074d38a611ee4d4edc2afa857563cf0e09527115 Mon Sep 17 00:00:00 2001
From: Fraser Tweedale <ftwee...@redhat.com>
Date: Tue, 16 Aug 2016 13:16:58 +1000
Subject: [PATCH 1/3] Add function for extracting PEM certs from PKCS #7

Add a single function for extracting X.509 certs in PEM format from
a PKCS #7 object.  Refactor sites that execute ``openssl pkcs7`` to
use the new function.

Part of: https://fedorahosted.org/freeipa/ticket/6178
---
 ipalib/x509.py                  | 23 +++++++++++++++++-
 ipapython/certdb.py             |  9 ++-----
 ipaserver/install/cainstance.py | 52 +++++++++++++++--------------------------
 3 files changed, 43 insertions(+), 41 deletions(-)

diff --git a/ipalib/x509.py b/ipalib/x509.py
index e1c3867..caf0ddc 100644
--- a/ipalib/x509.py
+++ b/ipalib/x509.py
@@ -48,7 +48,9 @@
 from ipalib import api
 from ipalib import util
 from ipalib import errors
+from ipaplatform.paths import paths
 from ipapython.dn import DN
+from ipapython import ipautil
 
 if six.PY3:
     unicode = str
@@ -56,7 +58,9 @@
 PEM = 0
 DER = 1
 
-PEM_REGEX = re.compile(r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)', re.DOTALL)
+PEM_REGEX = re.compile(
+    r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
+    re.DOTALL)
 
 EKU_SERVER_AUTH = '1.3.6.1.5.5.7.3.1'
 EKU_CLIENT_AUTH = '1.3.6.1.5.5.7.3.2'
@@ -145,6 +149,23 @@ def load_certificate_list_from_file(filename):
         return load_certificate_list(f.read())
 
 
+def pkcs7_to_pems(data, datatype=PEM):
+    """
+    Extract certificates from a PKCS #7 object.
+
+    Return a ``list`` of X.509 PEM strings.
+
+    May throw ``ipautil.CalledProcessError`` on invalid data.
+
+    """
+    cmd = [
+        paths.OPENSSL, "pkcs7", "-print_certs",
+        "-inform", "PEM" if datatype == PEM else "DER",
+    ]
+    result = ipautil.run(cmd, stdin=data, capture_output=True)
+    return PEM_REGEX.findall(result.output)
+
+
 def is_self_signed(certificate, datatype=PEM):
     cert = load_certificate(certificate, datatype)
     return cert.issuer == cert.subject
diff --git a/ipapython/certdb.py b/ipapython/certdb.py
index 5344e37..9b989ef 100644
--- a/ipapython/certdb.py
+++ b/ipapython/certdb.py
@@ -237,13 +237,8 @@ def import_files(self, files, db_password_filename, import_keys=False,
                             continue
 
                     if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'):
-                        args = [
-                            OPENSSL, 'pkcs7',
-                            '-print_certs',
-                        ]
                         try:
-                            result = ipautil.run(
-                                args, stdin=body, capture_output=True)
+                            certs = x509.pkcs7_to_pems(body)
                         except ipautil.CalledProcessError as e:
                             if label == 'CERTIFICATE':
                                 root_logger.warning(
@@ -255,7 +250,7 @@ def import_files(self, files, db_password_filename, import_keys=False,
                                     filename, line, e)
                             continue
                         else:
-                            extracted_certs += result.output + '\n'
+                            extracted_certs += '\n'.join(certs) + '\n'
                             loaded = True
                             continue
 
diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py
index 505232c..a3751d1 100644
--- a/ipaserver/install/cainstance.py
+++ b/ipaserver/install/cainstance.py
@@ -745,44 +745,30 @@ def __import_ca_chain(self):
         # makes openssl throw up.
         data = base64.b64decode(chain)
 
-        result = ipautil.run(
-            [paths.OPENSSL,
-             "pkcs7",
-             "-inform",
-             "DER",
-             "-print_certs",
-             ], stdin=data, capture_output=True)
-        certlist = result.output
+        certlist = x509.pkcs7_to_pems(data, x509.DER)
 
         # Ok, now we have all the certificates in certs, walk through it
         # and pull out each certificate and add it to our database
 
-        st = 1
-        en = 0
-        subid = 0
         ca_dn = DN(('CN','Certificate Authority'), self.subject_base)
-        while st > 0:
-            st = certlist.find('-----BEGIN', en)
-            en = certlist.find('-----END', en+1)
-            if st > 0:
-                try:
-                    (chain_fd, chain_name) = tempfile.mkstemp()
-                    os.write(chain_fd, certlist[st:en+25])
-                    os.close(chain_fd)
-                    (_rdn, subject_dn) = certs.get_cert_nickname(certlist[st:en+25])
-                    if subject_dn == ca_dn:
-                        nick = get_ca_nickname(self.realm)
-                        trust_flags = 'CT,C,C'
-                    else:
-                        nick = str(subject_dn)
-                        trust_flags = ',,'
-                    self.__run_certutil(
-                        ['-A', '-t', trust_flags, '-n', nick, '-a',
-                         '-i', chain_name]
-                    )
-                finally:
-                    os.remove(chain_name)
-                    subid += 1
+        for cert in certlist:
+            try:
+                chain_fd, chain_name = tempfile.mkstemp()
+                os.write(chain_fd, cert)
+                os.close(chain_fd)
+                (_rdn, subject_dn) = certs.get_cert_nickname(cert)
+                if subject_dn == ca_dn:
+                    nick = get_ca_nickname(self.realm)
+                    trust_flags = 'CT,C,C'
+                else:
+                    nick = str(subject_dn)
+                    trust_flags = ',,'
+                self.__run_certutil(
+                    ['-A', '-t', trust_flags, '-n', nick, '-a',
+                     '-i', chain_name]
+                )
+            finally:
+                os.remove(chain_name)
 
         # Restore NSS trust flags of all previously existing certificates
         for nick, trust_flags in cert_backup_list:

From 3b77890f1684eeea8b0a3a7966711bd3ee382d26 Mon Sep 17 00:00:00 2001
From: Fraser Tweedale <ftwee...@redhat.com>
Date: Fri, 18 Nov 2016 10:04:24 +1000
Subject: [PATCH 2/3] certdb: accumulate extracted certs as list of PEMs

certdb.NSSDatabase.import_files currently accumulates certificates
extracted from input files as a string, which is ugly.  Accumulate a
list of PEMs instead, and join() them just in time for PKCS #12
creation.

Part of: https://fedorahosted.org/freeipa/ticket/6178
---
 ipapython/certdb.py | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/ipapython/certdb.py b/ipapython/certdb.py
index 9b989ef..2aa92fc 100644
--- a/ipapython/certdb.py
+++ b/ipapython/certdb.py
@@ -201,7 +201,7 @@ def import_files(self, files, db_password_filename, import_keys=False,
         """
         key_file = None
         extracted_key = None
-        extracted_certs = ''
+        extracted_certs = []
 
         for filename in files:
             try:
@@ -232,7 +232,7 @@ def import_files(self, files, db_password_filename, import_keys=False,
                                     filename, line, e)
                                 continue
                         else:
-                            extracted_certs += body + '\n'
+                            extracted_certs.append(body)
                             loaded = True
                             continue
 
@@ -250,7 +250,7 @@ def import_files(self, files, db_password_filename, import_keys=False,
                                     filename, line, e)
                             continue
                         else:
-                            extracted_certs += '\n'.join(certs) + '\n'
+                            extracted_certs.extend(certs)
                             loaded = True
                             continue
 
@@ -300,7 +300,7 @@ def import_files(self, files, db_password_filename, import_keys=False,
                 pass
             else:
                 data = x509.make_pem(base64.b64encode(data))
-                extracted_certs += data + '\n'
+                extracted_certs.append(data)
                 continue
 
             # Try to import the file as PKCS#12 file
@@ -341,14 +341,15 @@ def import_files(self, files, db_password_filename, import_keys=False,
             raise RuntimeError(
                 "No server certificates found in %s" % (', '.join(files)))
 
-        certs = x509.load_certificate_list(extracted_certs)
-        for cert in certs:
+        for cert_pem in extracted_certs:
+            cert = x509.load_certificate(cert_pem)
             nickname = str(DN(cert.subject))
             data = cert.public_bytes(serialization.Encoding.DER)
             self.add_cert(data, nickname, ',,')
 
         if extracted_key:
-            in_file = ipautil.write_tmp_file(extracted_certs + extracted_key)
+            in_file = ipautil.write_tmp_file(
+                    '\n'.join(extracted_certs) + '\n' + extracted_key)
             out_file = tempfile.NamedTemporaryFile()
             out_password = ipautil.ipa_generate_password()
             out_pwdfile = ipautil.write_tmp_file(out_password)

From 104e7f303c09b45b7ccb1bf4c256798b436cbc4f Mon Sep 17 00:00:00 2001
From: Fraser Tweedale <ftwee...@redhat.com>
Date: Mon, 8 Aug 2016 14:27:20 +1000
Subject: [PATCH 3/3] Add options to write lightweight CA cert or chain to file

Administrators need a way to retrieve the certificate or certificate
chain of an IPA-managed lightweight CA.  Add params to the `ca'
object for carrying the CA certificate and chain (as multiple DER
values).  Add the `--chain' flag for including the chain in the
result (chain is also included with `--all').  Add the
`--certificate-out' option for writing the certificate to a file (or
the chain, if `--chain' was given).

Fixes: https://fedorahosted.org/freeipa/ticket/6178
---
 ipaclient/plugins/ca.py                   | 53 +++++++++++++++++++++++++++
 ipaserver/plugins/ca.py                   | 60 ++++++++++++++++++++++++++++---
 ipaserver/plugins/dogtag.py               | 12 +++++++
 ipatests/test_xmlrpc/tracker/ca_plugin.py | 31 ++++++++++++----
 ipatests/test_xmlrpc/xmlrpc_test.py       | 17 +++++++++
 5 files changed, 162 insertions(+), 11 deletions(-)
 create mode 100644 ipaclient/plugins/ca.py

diff --git a/ipaclient/plugins/ca.py b/ipaclient/plugins/ca.py
new file mode 100644
index 0000000..fcdf484
--- /dev/null
+++ b/ipaclient/plugins/ca.py
@@ -0,0 +1,53 @@
+#
+# Copyright (C) 2016  FreeIPA Contributors see COPYING for license
+#
+
+import base64
+from ipaclient.frontend import MethodOverride
+from ipalib import util, x509, Str
+from ipalib.plugable import Registry
+from ipalib.text import _
+
+register = Registry()
+
+
+class WithCertOutArgs(MethodOverride):
+
+    takes_options = (
+        Str(
+            'certificate_out?',
+            doc=_('Write certificate (chain if --chain used) to file'),
+            include='cli',
+            cli_metavar='FILE',
+        ),
+    )
+
+    def forward(self, *keys, **options):
+        filename = None
+        if 'certificate_out' in options:
+            filename = options.pop('certificate_out')
+            util.check_writable_file(filename)
+
+        result = super(WithCertOutArgs, self).forward(*keys, **options)
+        if filename:
+            def to_pem(x):
+                return x509.make_pem(x)
+            if options.get('chain', False):
+                ders = result['result']['certificate_chain']
+                data = '\n'.join(to_pem(base64.b64encode(der)) for der in ders)
+            else:
+                data = to_pem(result['result']['certificate'])
+            with open(filename, 'wb') as f:
+                f.write(data)
+
+        return result
+
+
+@register(override=True, no_fail=True)
+class ca_add(WithCertOutArgs):
+    pass
+
+
+@register(override=True, no_fail=True)
+class ca_show(WithCertOutArgs):
+    pass
diff --git a/ipaserver/plugins/ca.py b/ipaserver/plugins/ca.py
index d9ae8c8..8165a74 100644
--- a/ipaserver/plugins/ca.py
+++ b/ipaserver/plugins/ca.py
@@ -2,14 +2,18 @@
 # Copyright (C) 2016  FreeIPA Contributors see COPYING for license
 #
 
-from ipalib import api, errors, output, DNParam, Str
+import base64
+
+import six
+
+from ipalib import api, errors, output, Bytes, DNParam, Flag, Str
 from ipalib.constants import IPA_CA_CN
 from ipalib.plugable import Registry
 from ipaserver.plugins.baseldap import (
     LDAPObject, LDAPSearch, LDAPCreate, LDAPDelete,
     LDAPUpdate, LDAPRetrieve, LDAPQuery, pkey_to_value)
 from ipaserver.plugins.cert import ca_enabled_check
-from ipalib import _, ngettext
+from ipalib import _, ngettext, x509
 
 
 __doc__ = _("""
@@ -100,6 +104,18 @@ class ca(LDAPObject):
             doc=_('Issuer Distinguished Name'),
             flags=['no_create', 'no_update'],
         ),
+        Bytes(
+            'certificate',
+            label=_("Certificate"),
+            doc=_("Base-64 encoded certificate."),
+            flags={'no_create', 'no_update', 'no_search'},
+        ),
+        Bytes(
+            'certificate_chain*',
+            label=_("Certificate chain"),
+            doc=_("X.509 certificate chain"),
+            flags={'no_create', 'no_update', 'no_search'},
+        ),
     )
 
     permission_filter_objectclasses = ['ipaca']
@@ -145,6 +161,20 @@ class ca(LDAPObject):
     }
 
 
+def set_certificate_attrs(entry, options):
+    ca_id = entry['ipacaid'][0]
+    full = options.get('all', False)
+    with api.Backend.ra_lightweight_ca as ca_api:
+        der = ca_api.read_ca_cert(entry['ipacaid'][0])
+        entry['certificate'] = six.text_type(base64.b64encode(der))
+
+        if options.get('chain', False) or full:
+            pkcs7_der = ca_api.read_ca_chain(ca_id)
+            pems = x509.pkcs7_to_pems(pkcs7_der, x509.DER)
+            ders = [x509.normalize_certificate(pem) for pem in pems]
+            entry['certificate_chain'] = ders
+
+
 @register()
 class ca_find(LDAPSearch):
     __doc__ = _("Search for CAs.")
@@ -161,9 +191,19 @@ def execute(self, *keys, **options):
 class ca_show(LDAPRetrieve):
     __doc__ = _("Display the properties of a CA.")
 
-    def execute(self, *args, **kwargs):
+    takes_options = LDAPRetrieve.takes_options + (
+        Flag(
+            'chain',
+            default=False,
+            doc=_('Include certificate chain in output'),
+        ),
+    )
+
+    def execute(self, *keys, **options):
         ca_enabled_check()
-        return super(ca_show, self).execute(*args, **kwargs)
+        result = super(ca_show, self).execute(*keys, **options)
+        set_certificate_attrs(result['result'], options)
+        return result
 
 
 @register()
@@ -171,6 +211,14 @@ class ca_add(LDAPCreate):
     __doc__ = _("Create a CA.")
     msg_summary = _('Created CA "%(value)s"')
 
+    takes_options = LDAPCreate.takes_options + (
+        Flag(
+            'chain',
+            default=False,
+            doc=_('Include certificate chain in output'),
+        ),
+    )
+
     def pre_callback(self, ldap, dn, entry, entry_attrs, *keys, **options):
         ca_enabled_check()
         if not ldap.can_add(dn[1:]):
@@ -203,6 +251,10 @@ def pre_callback(self, ldap, dn, entry, entry_attrs, *keys, **options):
         entry['ipacasubjectdn'] = [resp['dn']]
         return dn
 
+    def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
+        set_certificate_attrs(entry_attrs, options)
+        return dn
+
 
 @register()
 class ca_del(LDAPDelete):
diff --git a/ipaserver/plugins/dogtag.py b/ipaserver/plugins/dogtag.py
index 0bdb4da..b77b21a 100644
--- a/ipaserver/plugins/dogtag.py
+++ b/ipaserver/plugins/dogtag.py
@@ -2125,6 +2125,18 @@ def read_ca(self, ca_id):
         except:
             raise errors.RemoteRetrieveError(reason=_("Response from CA was not valid JSON"))
 
+    def read_ca_cert(self, ca_id):
+        _status, _resp_headers, resp_body = self._ssldo(
+            'GET', '{}/cert'.format(ca_id),
+            headers={'Accept': 'application/pkix-cert'})
+        return resp_body
+
+    def read_ca_chain(self, ca_id):
+        _status, _resp_headers, resp_body = self._ssldo(
+            'GET', '{}/chain'.format(ca_id),
+            headers={'Accept': 'application/pkcs7-mime'})
+        return resp_body
+
     def disable_ca(self, ca_id):
         self._ssldo(
             'POST', ca_id + '/disable',
diff --git a/ipatests/test_xmlrpc/tracker/ca_plugin.py b/ipatests/test_xmlrpc/tracker/ca_plugin.py
index ec58c28..e18b1c1 100644
--- a/ipatests/test_xmlrpc/tracker/ca_plugin.py
+++ b/ipatests/test_xmlrpc/tracker/ca_plugin.py
@@ -8,7 +8,13 @@
 from ipapython.dn import DN
 from ipatests.test_xmlrpc.tracker.base import Tracker
 from ipatests.util import assert_deepequal
-from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_issuer, fuzzy_caid
+from ipatests.test_xmlrpc.xmlrpc_test import (
+    fuzzy_issuer,
+    fuzzy_caid,
+    fuzzy_base64,
+    fuzzy_sequence_of,
+    fuzzy_bytes,
+)
 from ipatests.test_xmlrpc import objectclasses
 
 
@@ -19,12 +25,21 @@
 class CATracker(Tracker):
     """Implementation of a Tracker class for CA plugin."""
 
-    retrieve_keys = {
+    ldap_keys = {
         'dn', 'cn', 'ipacaid', 'ipacasubjectdn', 'ipacaissuerdn', 'description'
     }
-    retrieve_all_keys = {'objectclass'} | retrieve_keys
-    create_keys = retrieve_all_keys
-    update_keys = retrieve_keys - {'dn'}
+    cert_keys = {
+        'certificate',
+    }
+    cert_all_keys = {
+        'certificate_chain',
+    }
+    find_keys = ldap_keys
+    find_all_keys = {'objectclass'} | ldap_keys
+    retrieve_keys = ldap_keys | cert_keys
+    retrieve_all_keys = {'objectclass'} | retrieve_keys | cert_all_keys
+    create_keys = {'objectclass'} | retrieve_keys
+    update_keys = ldap_keys - {'dn'}
 
     def __init__(self, name, subject, desc=u"Test generated CA",
                  default_version=None):
@@ -59,6 +74,8 @@ def track_create(self):
             ipacasubjectdn=[self.ipasubjectdn],
             ipacaissuerdn=[fuzzy_issuer],
             ipacaid=[fuzzy_caid],
+            certificate=fuzzy_base64,
+            certificate_chain=fuzzy_sequence_of(fuzzy_bytes),
             objectclass=objectclasses.ca
         )
         self.exists = True
@@ -102,9 +119,9 @@ def make_find_command(self, *args, **kwargs):
     def check_find(self, result, all=False, raw=False):
         """Check the plugin's `find` command result"""
         if all:
-            expected = self.filter_attrs(self.retrieve_all_keys)
+            expected = self.filter_attrs(self.find_all_keys)
         else:
-            expected = self.filter_attrs(self.retrieve_keys)
+            expected = self.filter_attrs(self.find_keys)
 
         assert_deepequal(dict(
             count=1,
diff --git a/ipatests/test_xmlrpc/xmlrpc_test.py b/ipatests/test_xmlrpc/xmlrpc_test.py
index 0ce1245..67565b0 100644
--- a/ipatests/test_xmlrpc/xmlrpc_test.py
+++ b/ipatests/test_xmlrpc/xmlrpc_test.py
@@ -22,6 +22,7 @@
 """
 from __future__ import print_function
 
+import collections
 import datetime
 import inspect
 
@@ -49,6 +50,20 @@
     '^cn=%s,cn=automember rebuild membership,cn=tasks,cn=config$' % uuid_re
 )
 
+# base64-encoded value
+fuzzy_base64 = Fuzzy('^[0-9A-Za-z/+]+={0,2}$')
+
+
+def fuzzy_sequence_of(fuzzy):
+    """Construct a Fuzzy for a Sequence of values matching the given Fuzzy."""
+    def test(xs):
+        if not isinstance(xs, collections.Sequence):
+            return False
+        else:
+            return all(fuzzy == x for x in xs)
+
+    return Fuzzy(test=test)
+
 # Matches an automember task finish message
 fuzzy_automember_message = Fuzzy(
     '^Automember rebuild task finished\. Processed \(\d+\) entries\.$'
@@ -109,6 +124,8 @@
 # match any string
 fuzzy_string = Fuzzy(type=six.string_types)
 
+fuzzy_bytes = Fuzzy(type=bytes)
+
 # case insensitive match of sets
 def fuzzy_set_ci(s):
     return Fuzzy(test=lambda other: set(x.lower() for x in other) == set(y.lower() for y in s))
-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to