On 6.4.2016 15:46, Pavel Vomacka wrote:


On 03/16/2016 01:50 PM, Jan Cholasta wrote:
Hi,

the attached patches implement the server-side part of
<https://fedorahosted.org/freeipa/ticket/5381>.

Honza

Hi,

thank you for the patches. I tested them and they work well. But I would
like to ask you whether would be possible to extend the response of
'basecert_find' method and probably also 'basecert_show' response. I
think of these information:

1) information whether the certificate is issued by our CA or not.

You can check for that by comparing the issuer name of the certificate to "CN=Certificate Authority,$SUBJECT_BASE". You can get subject base from config-show.


2) this probably wouldn't be possible (as we discussed), but I rather
write it too - the information about revocation reason. The same as the
'cert_show' provides.

Added --check-revocation flag to request this information. Currently it works only on certificates issued by our CA.


3) MD5 and SHA1 fingerprints as the 'cert_show' method returns

Added, also included SHA-256.


Thank you again.

Updated patches attached.

--
Jan Cholasta
From 1fa86fdec1d4adf0113038d78503e110af84e64b Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Wed, 16 Mar 2016 13:09:11 +0100
Subject: [PATCH 1/2] ldap: fix handling of binary data in search filters

This fixes a UnicodeDecodeError when passing non-UTF-8 binary data to
LDAPClient.make_filter() and friends.

https://fedorahosted.org/freeipa/ticket/5381
---
 ipapython/ipaldap.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/ipapython/ipaldap.py b/ipapython/ipaldap.py
index 405f1ee..eb56ad2 100644
--- a/ipapython/ipaldap.py
+++ b/ipapython/ipaldap.py
@@ -1208,7 +1208,12 @@ class LDAPClient(object):
             ]
             return self.combine_filters(flts, rules)
         elif value is not None:
-            value = ldap.filter.escape_filter_chars(value_to_utf8(value))
+            if isinstance(value, bytes):
+                if six.PY3:
+                    value = value.decode('raw_unicode_escape')
+            else:
+                value = value_to_utf8(value)
+            value = ldap.filter.escape_filter_chars(value)
             if not exact:
                 template = '%s'
                 if leading_wildcard:
-- 
2.5.5

From aae7d41b8e547ddb360d8b88cf2c572b08c82ddd Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Wed, 16 Mar 2016 13:11:05 +0100
Subject: [PATCH 2/2] api: add basecert plugins

Introduces basecert-show and basecert-find commands for getting
information about certificates.

The commands allow querying for a specific certificate or all certificates
owned by a specific user, host or service and return selected fields
extracted from the certificate as well as owner information.

https://fedorahosted.org/freeipa/ticket/5381
---
 API.txt                    |  36 ++++
 VERSION                    |   4 +-
 ipalib/plugins/basecert.py | 497 +++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 535 insertions(+), 2 deletions(-)
 create mode 100644 ipalib/plugins/basecert.py

diff --git a/API.txt b/API.txt
index 3598b08..23841ae 100644
--- a/API.txt
+++ b/API.txt
@@ -444,6 +444,42 @@ option: Str('version?', exclude='webui')
 output: Entry('result', <type 'dict'>, Gettext('A dictionary representing an LDAP entry', domain='ipa', localedir=None))
 output: Output('summary', (<type 'unicode'>, <type 'NoneType'>), None)
 output: PrimaryKey('value', None, None)
+command: basecert_find
+args: 1,16,4
+arg: Str('criteria?', noextrawhitespace=False)
+option: Flag('all', autofill=True, cli_name='all', default=False, exclude='webui')
+option: Bytes('certificate', attribute=True, autofill=False, cli_name='certificate', multivalue=False, primary_key=True, query=True, required=False)
+option: Flag('check_revocation', autofill=True, default=False)
+option: File('file?', include='cli')
+option: Str('host*', cli_name='hosts', csv=True)
+option: Str('no_host*', cli_name='no_hosts', csv=True)
+option: Flag('no_members', autofill=True, default=False, exclude='webui')
+option: Str('no_service*', cli_name='no_services', csv=True)
+option: Str('no_user*', cli_name='no_users', csv=True)
+option: Flag('pkey_only?', autofill=True, default=False)
+option: Flag('raw', autofill=True, cli_name='raw', default=False, exclude='webui')
+option: Str('service*', cli_name='services', csv=True)
+option: Int('sizelimit?', autofill=False, minvalue=0)
+option: Int('timelimit?', autofill=False, minvalue=0)
+option: Str('user*', cli_name='users', csv=True)
+option: Str('version?', exclude='webui')
+output: Output('count', <type 'int'>, None)
+output: ListOfEntries('result', (<type 'list'>, <type 'tuple'>), Gettext('A list of LDAP entries', domain='ipa', localedir=None))
+output: Output('summary', (<type 'unicode'>, <type 'NoneType'>), None)
+output: Output('truncated', <type 'bool'>, None)
+command: basecert_show
+args: 1,7,3
+arg: File('file?', include='cli')
+option: Flag('all', autofill=True, cli_name='all', default=False, exclude='webui')
+option: Bytes('certificate', attribute=True, cli_name='certificate', multivalue=False, primary_key=True, query=True, required=False)
+option: Flag('check_revocation', autofill=True, default=False)
+option: Flag('no_members', autofill=True, default=False, exclude='webui')
+option: Str('out?', include='cli')
+option: Flag('raw', autofill=True, cli_name='raw', default=False, exclude='webui')
+option: Str('version?', exclude='webui')
+output: Entry('result', <type 'dict'>, Gettext('A dictionary representing an LDAP entry', domain='ipa', localedir=None))
+output: Output('summary', (<type 'unicode'>, <type 'NoneType'>), None)
+output: PrimaryKey('value', None, None)
 command: batch
 args: 1,1,2
 arg: Any('methods*')
diff --git a/VERSION b/VERSION
index aedebd1..6e6d444 100644
--- a/VERSION
+++ b/VERSION
@@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000
 #                                                      #
 ########################################################
 IPA_API_VERSION_MAJOR=2
-IPA_API_VERSION_MINOR=165
-# Last change: mbasti - limit ipamaxusernamelength value to 255
+IPA_API_VERSION_MINOR=166
+# Last change: jcholast - api: add basecert plugins
diff --git a/ipalib/plugins/basecert.py b/ipalib/plugins/basecert.py
new file mode 100644
index 0000000..f8b2004
--- /dev/null
+++ b/ipalib/plugins/basecert.py
@@ -0,0 +1,497 @@
+#
+# Copyright (C) 2016  FreeIPA Contributors see COPYING for license
+#
+
+import base64
+import datetime
+import hashlib
+
+import six
+from nss import nss
+
+from ipalib import Method, Object, Retrieve, Search, _, errors, messages, x509
+from ipalib.capabilities import client_has_capability
+from ipalib.parameters import (
+    Bool, Bytes, DateTime, DNParam, File, Flag, Int, Str)
+from ipalib.plugable import Registry
+from ipapython.dn import DN
+
+__doc__ = _("""
+Certificates
+""") + _("""
+Get information about certificates.
+""") + _("""
+EXAMPLES:
+""") + _("""
+ Examine a certificate:
+   ipa basecert-show cert.pem
+""") + _("""
+ Find certificates owned by a specific user:
+   ipa basecert-find --user=user
+""") + _("""
+ Verify that a certificate is owner by a specific user:
+   ipa basecert-find --file=cert.pem --user=user
+""")
+
+if six.PY3:
+    unicode = str
+
+register = Registry()
+
+_OWNERS = ('user', 'host', 'service')
+
+
+def _conv_validity(value):
+    """
+    Convert from microseconds since epoch to datetime object.
+    """
+    value = int(value)
+    value = datetime.datetime.utcfromtimestamp(value // 1000000)
+    return value
+
+
+@register()
+class basecert(Object):
+    """
+    Certificate object.
+    """
+    takes_params = (
+        Bytes(
+            'certificate',
+            label=_("Certificate"),
+            doc=_("Base-64 encoded certificate."),
+            primary_key=True,
+            normalizer=x509.normalize_certificate,
+        ),
+        Int(
+            'serial_number',
+            label=_("Serial Number"),
+            flags={'no_search'},
+        ),
+        DNParam(
+            'issuer',
+            label=_("Issuer"),
+            flags={'no_search'},
+        ),
+        DateTime(
+            'valid_not_before',
+            label=_("Not Before"),
+            flags={'no_search'},
+        ),
+        DateTime(
+            'valid_not_after',
+            label=_("Not After"),
+            flags={'no_search'},
+        ),
+        DNParam(
+            'subject',
+            label=_("Subject"),
+            flags={'no_search'},
+        ),
+        Str(
+            'md5_fingerprint',
+            label=_('Fingerprint (MD5)'),
+            flags={'no_search'},
+        ),
+        Str(
+            'sha1_fingerprint',
+            label=_('Fingerprint (SHA1)'),
+            flags={'no_search'},
+        ),
+        Str(
+            'sha256_fingerprint',
+            label=_('Fingerprint (SHA-256)'),
+            flags={'no_search'},
+        ),
+        DNParam(
+            'owner*',
+            flags={'no_search'},
+        ),
+        Bool(
+            'revoked',
+            label=_('Revoked'),
+            flags={'no_search'},
+        ),
+        Int(
+            'revocation_reason?',
+            label=_('Revocation reason'),
+            flags={'no_search'},
+        ),
+    )
+
+    def get_params(self):
+        for param in super(basecert, self).get_params():
+            yield param
+
+        for owner in _OWNERS:
+            name = self.api.Object[owner].object_name
+            yield Str(
+                'owner_{0}*'.format(owner),
+                label=unicode(_("Owner {0}")).format(name),
+                flags={'no_search'},
+            )
+
+    def _ldap_search(self, cert=None, filter=None, no_owner=False,
+                     time_limit=None, size_limit=None):
+        ldap = self.api.Backend.ldap2
+
+        if cert is not None:
+            cert_filter = ldap.make_filter_from_attr('usercertificate', cert)
+        else:
+            cert_filter = '(usercertificate=*)'
+
+        if filter is not None:
+            filter = ldap.combine_filters((filter, cert_filter),
+                                          ldap.MATCH_ALL)
+        else:
+            filter = cert_filter
+
+        try:
+            entries, truncated = ldap.find_entries(
+                base_dn=self.api.env.basedn,
+                filter=filter,
+                attrs_list=['usercertificate'],
+                time_limit=time_limit,
+                size_limit=size_limit,
+            )
+        except errors.EmptyResult:
+            entries, truncated = [], False
+
+        result = []
+        result_dict = {}
+        for entry in entries:
+            for attr in ('usercertificate', 'usercertificate;binary'):
+                for value in entry.get(attr, []):
+                    if cert is not None and value != cert:
+                        continue
+                    try:
+                        obj = result_dict[value]
+                    except KeyError:
+                        obj = {'certificate': value}
+                        result.append(obj)
+                        result_dict[value] = obj
+                    if not no_owner:
+                        obj.setdefault('owner', []).append(entry.dn)
+
+        return result, truncated
+
+    def _search(self, cert=None, filter=None, no_owner=False, no_parse=False,
+                time_limit=None, size_limit=None):
+        def ldap_search():
+            return self._ldap_search(
+                cert=cert,
+                filter=filter,
+                no_owner=no_owner,
+                time_limit=time_limit,
+                size_limit=size_limit,
+            )
+
+        if cert is not None and filter is None:
+            result, truncated = [{'certificate': cert}], False
+
+            if not no_owner:
+                ldap_result, ldap_truncated = ldap_search()
+                result = ldap_result or result
+                truncated = ldap_truncated or truncated
+        else:
+            result, truncated = ldap_search()
+
+            if filter is None:
+                # unfiltered search should return all certificates ever,
+                # but we can only return certificates which are in LDAP,
+                # in which case mark the result as truncated
+                truncated = True
+
+        if no_parse:
+            return result, truncated
+
+        for obj in result:
+            for name, hash in (('md5_fingerprint', hashlib.md5),
+                               ('sha1_fingerprint', hashlib.sha1),
+                               ('sha256_fingerprint', hashlib.sha256)):
+                obj[name] = unicode(
+                    nss.data_to_hex(hash(obj['certificate']).digest()))
+
+            nss_cert = x509.load_certificate(obj['certificate'], x509.DER)
+            obj['subject'] = DN(unicode(nss_cert.subject))
+            obj['serial_number'] = nss_cert.serial_number
+            obj['issuer'] = DN(unicode(nss_cert.issuer))
+            obj['valid_not_before'] = _conv_validity(nss_cert.valid_not_before)
+            obj['valid_not_after'] = _conv_validity(nss_cert.valid_not_after)
+            del nss_cert
+
+            dns = obj.pop('owner', [])
+            for owner in _OWNERS:
+                plugin = self.api.Object[owner]
+                suffix = DN(plugin.container_dn, self.api.env.basedn)
+                name = 'owner_{0}'.format(owner)
+                for dn in dns:
+                    if dn.endswith(suffix, 1):
+                        pkey = plugin.get_primary_key_from_dn(dn)
+                        obj.setdefault(name, []).append(pkey)
+
+        return result, truncated
+
+    def _check_revocation(self, result):
+        if self.api.env.ra_plugin != 'dogtag':
+            return
+
+        issuer = DN(('CN', 'Certificate Authority'), x509.subject_base())
+        obj_dict = {obj['serial_number']: obj for obj in result
+                    if obj['issuer'] == issuer}
+        if not obj_dict:
+            return
+
+        options = dict(
+            min_serial_number=min(obj_dict),
+            max_serial_number=max(obj_dict),
+        )
+        try:
+            d_result = self.api.Backend.ra.find(options)
+        except errors.CertificateOperationError:
+            return
+
+        for d_obj in d_result:
+            serial = d_obj['serial_number']
+
+            try:
+                obj = obj_dict[serial]
+            except KeyError:
+                continue
+
+            try:
+                status = d_obj['status']
+            except KeyError:
+                continue
+
+            obj['revoked'] = status in ('REVOKED', 'REVOKED_EXPIRED')
+            if not obj['revoked']:
+                continue
+
+            try:
+                d_obj = self.api.Backend.ra.get_certificate(str(serial))
+            except errors.CertificateOperationError:
+                revocation_reason = None
+            else:
+                revocation_reason = d_obj.get('revocation_reason')
+            obj['revocation_reason'] = revocation_reason or 0
+
+
+class BaseCertificateMethod(Method):
+    takes_options = (
+        File(
+            'file?',
+            label=_("Input filename"),
+            doc=_('File to load the certificate from.'),
+            include='cli',
+        ),
+        Flag(
+            'no_members',
+            doc=_("Suppress processing of membership attributes."),
+            exclude='webui',
+            flags={'no_option', 'no_output'},
+        ),
+        Flag(
+            'check_revocation',
+            doc=_("Check revocation status."),
+        ),
+    )
+
+
+@register()
+class basecert_show(BaseCertificateMethod, Retrieve):
+    __doc__ = _("Display information about a certificate.")
+
+    takes_options = BaseCertificateMethod.takes_options + (
+        Str(
+            'out?',
+            label=_("Output filename"),
+            doc=_("File to store the certificate in."),
+            include='cli',
+        ),
+    )
+
+    def get_args(self):
+        # switch 'certificate' and 'file' in CLI
+        for arg in super(basecert_show, self).get_args():
+            if self.api.env.context != 'cli' or arg.name != 'certificate':
+                yield arg
+        for option in super(basecert_show, self).get_options():
+            if option.name == 'file':
+                yield option
+
+    def get_options(self):
+        # switch 'certificate' and 'file' in CLI
+        for arg in super(basecert_show, self).get_args():
+            if self.api.env.context == 'cli' and arg.name == 'certificate':
+                yield arg.clone(required=False)
+        for option in super(basecert_show, self).get_options():
+            if option.name != 'file':
+                yield option
+
+    def forward(self, *args, **options):
+        if self.api.env.context == 'cli':
+            if args[-1] and 'certificate' in options:
+                raise errors.MutuallyExclusiveError(
+                    reason=_("cannot specify both raw certificate and file"))
+            if not args[-1] and 'certificate' in options:
+                args = args[:-1] + (options.pop('certificate'),)
+
+            out = options.pop('out', None)
+
+        result = super(basecert_show, self).forward(*args, **options)
+
+        if self.api.env.context == 'cli':
+            if out is not None:
+                x509.write_certificate(result['result']['certificate'], out)
+
+        return result
+
+    def execute(self, *args, **options):
+        cert = args[-1]
+        if cert is not None:
+            x509.validate_certificate(cert, x509.DER)
+
+        no_parse = options.get('raw', False)
+
+        result, truncated = self.obj._search(
+            cert=cert,
+            no_owner=options.get('no_members', False),
+            no_parse=no_parse,
+        )
+        if truncated:
+            raise errors.LimitsExceeded()
+
+        assert len(result) == 1, len(result)
+
+        if options.get('check_revocation', False) and not no_parse:
+            self.obj._check_revocation(result)
+
+        result = result[0]
+
+        value = cert
+        if not client_has_capability(options['version'], 'primary_key_types'):
+            value = unicode(base64.b64encode(value))
+
+        return dict(summary=None,
+                    result=result,
+                    value=value)
+
+
+@register()
+class basecert_find(BaseCertificateMethod, Search):
+    __doc__ = _("Search for certificates.")
+
+    takes_options = BaseCertificateMethod.takes_options + (
+        Flag(
+            'pkey_only?',
+            label=_("Primary key only"),
+            doc=_("Results should contain primary key attribute only "
+                  "(\"certificate\")"),
+        ),
+        Int(
+            'timelimit?',
+            label=_("Time Limit"),
+            doc=_("Time limit of search in seconds (0 is unlimited)"),
+            flags={'no_display'},
+            minvalue=0,
+            autofill=False,
+        ),
+        Int(
+            'sizelimit?',
+            label=_("Size Limit"),
+            doc=_("Maximum number of entries returned (0 is unlimited)"),
+            flags={'no_display'},
+            minvalue=0,
+            autofill=False,
+        ),
+    )
+
+    def get_options(self):
+        for option in super(basecert_find, self).get_options():
+            yield option
+
+        for owner in _OWNERS:
+            name = self.api.Object[owner].object_name
+            name_plural = self.api.Object[owner].object_name_plural
+            yield Str(
+                '{0}*'.format(owner),
+                cli_name='{0}s'.format(owner),
+                doc=unicode(
+                    _("Search for certificates with these owner {0}.")
+                    ).format(name_plural),
+                label=name,
+                csv=True,
+            )
+            yield Str(
+                'no_{0}*'.format(owner),
+                cli_name='no_{0}s'.format(owner),
+                doc=unicode(
+                    _("Search for certificates without these owner {0}.")
+                    ).format(name_plural),
+                label=name,
+                csv=True,
+            )
+
+    def forward(self, *args, **options):
+        if self.api.env.context == 'cli':
+            if 'certificate' in options and 'file' in options:
+                raise errors.MutuallyExclusiveError(
+                    reason=_("cannot specify both raw certificate and file"))
+            if 'certificate' not in options and 'file' in options:
+                options['certificate'] = x509.strip_header(options.pop('file'))
+
+        return super(basecert_find, self).forward(*args, **options)
+
+    def execute(self, *args, **options):
+        cert = options.get('certificate')
+        if cert is not None:
+            x509.validate_certificate(cert, x509.DER)
+
+        pkey_only = options.get('pkey_only', False)
+        no_owner = options.get('no_owner', False) or pkey_only
+        no_parse = options.get('raw', False) or pkey_only
+
+        if not args[-1]:
+            ldap = self.api.Backend.ldap2
+
+            filters = []
+            for owner in _OWNERS:
+                for prefix, rule in (('', ldap.MATCH_ALL),
+                                     ('no_', ldap.MATCH_NONE)):
+                    value = options.get(prefix + owner)
+                    if not value:
+                        continue
+                    filter = ldap.make_filter_from_attr(
+                        self.api.Object[owner].primary_key.name, value, rule)
+                    filters.append(filter)
+            if filters:
+                filter = ldap.combine_filters(filters, ldap.MATCH_ALL)
+            else:
+                filter = None
+
+            result, truncated = self.obj._search(
+                cert=cert,
+                filter=filter,
+                no_owner=no_owner,
+                no_parse=no_parse,
+                time_limit=options.get('timelimit'),
+                size_limit=options.get('sizelimit'),
+            )
+        else:
+            # there are no searchable attributes, therefore return empty
+            # result if any criteria are specified
+            result = []
+            truncated = False
+
+        if options.get('check_revocation', False) and not no_parse:
+            self.obj._check_revocation(result)
+
+        if truncated:
+            self.add_message(messages.SearchResultTruncated(
+                reason=_("limits exceeded for this query")))
+
+        return dict(summary=None,
+                    result=result,
+                    truncated=truncated,
+                    count=len(result))
-- 
2.5.5

-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to