On 14.6.2016 11:44, Jan Cholasta wrote:
On 21.4.2016 09:11, Jan Cholasta wrote:
On 6.4.2016 15:46, Pavel Vomacka wrote:


On 03/16/2016 01:50 PM, Jan Cholasta wrote:
Hi,

the attached patches implement the server-side part of
<https://fedorahosted.org/freeipa/ticket/5381>.

Honza

Hi,

thank you for the patches. I tested them and they work well. But I would
like to ask you whether would be possible to extend the response of
'basecert_find' method and probably also 'basecert_show' response. I
think of these information:

1) information whether the certificate is issued by our CA or not.

You can check for that by comparing the issuer name of the certificate
to "CN=Certificate Authority,$SUBJECT_BASE". You can get subject base
from config-show.


2) this probably wouldn't be possible (as we discussed), but I rather
write it too - the information about revocation reason. The same as the
'cert_show' provides.

Added --check-revocation flag to request this information. Currently it
works only on certificates issued by our CA.


3) MD5 and SHA1 fingerprints as the 'cert_show' method returns

Added, also included SHA-256.


Thank you again.

Updated patches attached.

Updated and rebased patches attached. Requires Fraser's sub-CA patches.

Attaching updated patch 623, which fixes these issues found by David: <https://paste.fedoraproject.org/378997/65913663/>.

--
Jan Cholasta
From 4afb5a1bee3e2d152052d6ed924cafe4c1c51187 Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Tue, 14 Jun 2016 09:09:10 +0200
Subject: [PATCH 3/4] cert: add owner information

Get owner information from LDAP in cert-show and cert-find. Allow search by
owner in cert-find.

https://fedorahosted.org/freeipa/ticket/5381
---
 API.txt                   |  15 ++-
 VERSION                   |   4 +-
 ipaserver/plugins/cert.py | 267 ++++++++++++++++++++++++++++++++++++++++------
 3 files changed, 251 insertions(+), 35 deletions(-)

diff --git a/API.txt b/API.txt
index f01d3c1..94bf337 100644
--- a/API.txt
+++ b/API.txt
@@ -730,23 +730,33 @@ output: Entry('result')
 output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
 output: PrimaryKey('value')
 command: cert_find
-args: 1,20,4
+args: 1,30,4
 arg: Str('criteria?')
 option: Flag('all', autofill=True, cli_name='all', default=False)
 option: Str('cacn?', cli_name='ca')
 option: Flag('exactly?', autofill=True, default=False)
+option: Str('host*', cli_name='hosts')
+option: Str('idoverrideuser*', cli_name='idoverrideusers')
 option: DateTime('issuedon_from?', autofill=False)
 option: DateTime('issuedon_to?', autofill=False)
 option: DNParam('issuer?', autofill=False)
 option: Int('max_serial_number?', autofill=False)
 option: Int('min_serial_number?', autofill=False)
+option: Str('no_host*', cli_name='no_hosts')
+option: Str('no_idoverrideuser*', cli_name='no_idoverrideusers')
+option: Flag('no_members', autofill=True, default=True)
+option: Str('no_service*', cli_name='no_services')
+option: Str('no_user*', cli_name='no_users')
 option: Flag('pkey_only?', autofill=True, default=False)
 option: Flag('raw', autofill=True, cli_name='raw', default=False)
 option: Int('revocation_reason?', autofill=False, default=0)
 option: DateTime('revokedon_from?', autofill=False)
 option: DateTime('revokedon_to?', autofill=False)
+option: Str('service*', cli_name='services')
 option: Int('sizelimit?')
 option: Str('subject?', autofill=False)
+option: Int('timelimit?')
+option: Str('user*', cli_name='users')
 option: DateTime('validnotafter_from?', autofill=False)
 option: DateTime('validnotafter_to?', autofill=False)
 option: DateTime('validnotbefore_from?', autofill=False)
@@ -782,10 +792,11 @@ option: Int('revocation_reason', autofill=True, default=0)
 option: Str('version?')
 output: Output('result')
 command: cert_show
-args: 1,5,3
+args: 1,6,3
 arg: Int('serial_number')
 option: Flag('all', autofill=True, cli_name='all', default=False)
 option: Str('cacn?', cli_name='ca')
+option: Flag('no_members', autofill=True, default=False)
 option: Str('out?')
 option: Flag('raw', autofill=True, cli_name='raw', default=False)
 option: Str('version?')
diff --git a/VERSION b/VERSION
index 354aa62..a383578 100644
--- a/VERSION
+++ b/VERSION
@@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000
 #                                                      #
 ########################################################
 IPA_API_VERSION_MAJOR=2
-IPA_API_VERSION_MINOR=185
-# Last change: cert: add object plugin
+IPA_API_VERSION_MINOR=186
+# Last change: cert: add owner information
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index 9364fdd..5698c3a 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -19,6 +19,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import base64
 import binascii
 import datetime
 import os
@@ -110,6 +111,9 @@ EXAMPLES:
  Search for certificates based on issuance date
    ipa cert-find --issuedon-from=2013-02-01 --issuedon-to=2013-02-07
 
+ Search for certificates owned by a specific user:
+   ipa cert-find --user=user
+
 IPA currently immediately issues (or declines) all certificate requests so
 the status of a request is not normally useful. This is for future use
 or the case where a CA does not immediately issue a certificate.
@@ -656,9 +660,48 @@ class cert(BaseCertObject):
                 param = param.clone(flags=param.flags - {'no_search'})
             yield param
 
+        for owner in self._owners():
+            yield owner.primary_key.clone_rename(
+                'owner_{0}'.format(owner.name),
+                required=False,
+                multivalue=True,
+                primary_key=False,
+                label=_("Owner %s") % owner.object_name,
+                flags={'no_create', 'no_update', 'no_search'},
+            )
+
+    def _owners(self):
+        for name in ('user', 'host', 'service', 'idoverrideuser'):
+            yield self.api.Object[name]
+
+    def _fill_owners(self, obj):
+        for owner in self._owners():
+            container_dn = DN(owner.container_dn, self.api.env.basedn)
+            name = 'owner_' + owner.name
+            for dn in obj['owner']:
+                if dn.endswith(container_dn, 1):
+                    value = owner.get_primary_key_from_dn(dn)
+                    obj.setdefault(name, []).append(value)
+
+
+class CertMethod(BaseCertMethod):
+    def get_options(self):
+        for option in super(CertMethod, self).get_options():
+            yield option
+
+        for o in self.has_output:
+            if isinstance(o, (output.Entry, output.ListOfEntries)):
+                yield Flag(
+                    'no_members',
+                    doc=_("Suppress processing of membership attributes."),
+                    exclude='webui',
+                    flags={'no_output'},
+                )
+                break
+
 
 @register()
-class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
+class cert_show(Retrieve, CertMethod, VirtualCommand):
     __doc__ = _('Retrieve an existing certificate.')
 
     takes_options = (
@@ -671,7 +714,8 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
 
     operation="retrieve certificate"
 
-    def execute(self, serial_number, all=False, raw=False, **options):
+    def execute(self, serial_number, all=False, raw=False, no_members=False,
+                **options):
         ca_enabled_check()
         hostname = None
         try:
@@ -701,10 +745,26 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
                     "issued by CA '%(ca)s' not found")
                     % dict(serial=serial_number, ca=options['cacn']))
 
+        if all or not no_members:
+            ldap = self.api.Backend.ldap2
+            filter = ldap.make_filter_from_attr(
+                'usercertificate', base64.b64decode(result['certificate']))
+            try:
+                entries = ldap.get_entries(base_dn=self.api.env.basedn,
+                                           filter=filter,
+                                           attrs_list=[''])
+            except errors.EmptyResult:
+                entries = []
+            for entry in entries:
+                result.setdefault('owner', []).append(entry.dn)
+
         if not raw:
             result['certificate'] = result['certificate'].replace('\r\n', '')
             self.obj._parse(result)
             result['revoked'] = ('revocation_reason' in result)
+            if 'owner' in result:
+                self.obj._fill_owners(result)
+                del result['owner']
 
         if hostname:
             # If we have a hostname we want to verify that the subject
@@ -716,7 +776,7 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
 
 
 @register()
-class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
+class cert_revoke(PKQuery, CertMethod, VirtualCommand):
     __doc__ = _('Revoke a certificate.')
 
     operation = "revoke certificate"
@@ -752,7 +812,7 @@ class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
 
 
 @register()
-class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
+class cert_remove_hold(PKQuery, CertMethod, VirtualCommand):
     __doc__ = _('Take a revoked certificate off hold.')
 
     has_output_params = (
@@ -781,7 +841,7 @@ class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
 
 
 @register()
-class cert_find(Search, BaseCertMethod):
+class cert_find(Search, CertMethod):
     __doc__ = _('Search for existing certificates.')
 
     takes_options = (
@@ -851,6 +911,11 @@ class cert_find(Search, BaseCertMethod):
             doc=_("Results should contain primary key attribute only "
                   "(\"certificate\")"),
         ),
+        Int('timelimit?',
+            label=_('Time Limit'),
+            doc=_('Time limit of search in seconds (0 is unlimited)'),
+            minvalue=0,
+        ),
         Int('sizelimit?',
             label=_("Size Limit"),
             doc=_("Maximum number of entries returned (0 is unlimited)"),
@@ -862,9 +927,63 @@ class cert_find(Search, BaseCertMethod):
         '%(count)d certificate matched', '%(count)d certificates matched', 0
     )
 
+    def get_options(self):
+        for option in super(cert_find, self).get_options():
+            if option.name == 'no_members':
+                option = option.clone(default=True,
+                                      flags=set(option.flags) | {'no_option'})
+            yield option
+
+        for owner in self.obj._owners():
+            yield owner.primary_key.clone_rename(
+                '{0}'.format(owner.name),
+                required=False,
+                multivalue=True,
+                primary_key=False,
+                query=True,
+                cli_name='{0}s'.format(owner.name),
+                doc=(_("Search for certificates with these owner %s.") %
+                     owner.object_name_plural),
+                label=owner.object_name,
+            )
+            yield owner.primary_key.clone_rename(
+                'no_{0}'.format(owner.name),
+                required=False,
+                multivalue=True,
+                primary_key=False,
+                query=True,
+                cli_name='no_{0}s'.format(owner.name),
+                doc=(_("Search for certificates without these owner %s.") %
+                     owner.object_name_plural),
+                label=owner.object_name,
+            )
+
     def execute(self, criteria=None, all=False, raw=False, pkey_only=False,
-                sizelimit=None, **options):
-        ca_enabled_check()
+                no_members=True, timelimit=None, sizelimit=None, **options):
+        ca_options = {'cacn',
+                      'revocation_reason',
+                      'issuer',
+                      'subject',
+                      'min_serial_number', 'max_serial_number',
+                      'exactly',
+                      'validnotafter_from', 'validnotafter_to',
+                      'validnotbefore_from', 'validnotbefore_to',
+                      'issuedon_from', 'issuedon_to',
+                      'revokedon_from', 'revokedon_to'}
+        ldap_options = {prefix + owner.name
+                        for owner in self.obj._owners()
+                        for prefix in ('', 'no_')}
+        has_ca_options = any(options.get(name) for name in ca_options)
+        has_ldap_options = any(options.get(name) for name in ldap_options)
+
+        try:
+            ca_enabled_check()
+        except errors.NotFound:
+            if has_ca_options:
+                raise
+            ca_enabled = False
+        else:
+            ca_enabled = True
 
         if 'cacn' in options:
             ca_obj = api.Command.ca_show(options['cacn'])['result']
@@ -881,47 +1000,133 @@ class cert_find(Search, BaseCertMethod):
             return dict(result=[], count=0, truncated=False)
 
         obj_seq = []
+        obj_dict = {}
+        truncated = False
+
+        if ca_enabled:
+            ra_options = {}
+            for name, value in options.items():
+                if name not in ca_options:
+                    continue
+                if isinstance(value, datetime.datetime):
+                    value = value.strftime(PKIDATE_FORMAT)
+                ra_options[name] = value
+            if sizelimit is not None:
+                ra_options['sizelimit'] = sizelimit
+
+            for ra_obj in self.Backend.ra.find(ra_options):
+                obj = {}
+                if ((not pkey_only and all) or
+                        not no_members or
+                        not has_ca_options or
+                        has_ldap_options):
+                    ra_obj.update(
+                        self.Backend.ra.get_certificate(
+                            str(ra_obj['serial_number'])))
+                    cert = base64.b64decode(ra_obj['certificate'])
+                    obj_dict[cert] = obj
+                obj_seq.append(obj)
+                obj.update(ra_obj)
+
+        if ((not pkey_only and all) or
+                not no_members or
+                not has_ca_options or
+                has_ldap_options):
+            ldap = self.api.Backend.ldap2
+
+            filters = []
+            cert_filter = '(usercertificate=*)'
+            filters.append(cert_filter)
+            for owner in self.obj._owners():
+                oc_filter = ldap.make_filter_from_attr(
+                    'objectclass', owner.object_class, ldap.MATCH_ALL)
+                for prefix, rule in (('', ldap.MATCH_ALL),
+                                     ('no_', ldap.MATCH_NONE)):
+                    value = options.get(prefix + owner.name)
+                    if value is None:
+                        continue
+                    pkey_filter = ldap.make_filter_from_attr(
+                        owner.primary_key.name, value, rule)
+                    filters.append(oc_filter)
+                    filters.append(pkey_filter)
+            filter = ldap.combine_filters(filters, ldap.MATCH_ALL)
 
-        ra_options = {}
-        for name, value in options.items():
-            if isinstance(value, datetime.datetime):
-                value = value.strftime(PKIDATE_FORMAT)
-            ra_options[name] = value
-        if sizelimit is not None:
-            ra_options['sizelimit'] = sizelimit
-
-        for ra_obj in self.Backend.ra.find(ra_options):
-            obj = {}
-            if all:
-                ra_obj.update(
-                    self.Backend.ra.get_certificate(
-                        str(ra_obj['serial_number'])))
-            obj_seq.append(obj)
-            obj.update(ra_obj)
+            try:
+                entries, truncated = ldap.find_entries(
+                    base_dn=self.api.env.basedn,
+                    filter=filter,
+                    attrs_list=['usercertificate'],
+                    time_limit=timelimit,
+                    size_limit=sizelimit,
+                )
+            except errors.EmptyResult:
+                entries, truncated = [], False
+            for entry in entries:
+                seen = set()
+                for attr in ('usercertificate', 'usercertificate;binary'):
+                    for cert in entry.get(attr, []):
+                        if cert in seen:
+                            continue
+                        seen.add(cert)
+                        try:
+                            obj = obj_dict[cert]
+                        except KeyError:
+                            if has_ca_options:
+                                continue
+                            obj = {
+                                'certificate': unicode(base64.b64encode(cert))}
+                            obj_seq.append(obj)
+                            obj_dict[cert] = obj
+                        obj.setdefault('owner', []).append(entry.dn)
 
         result = []
         for obj in obj_seq:
+            if has_ldap_options and 'owner' not in obj:
+                continue
             if not pkey_only:
                 if not raw:
                     if 'certificate' in obj:
                         obj['certificate'] = (
                             obj['certificate'].replace('\r\n', ''))
                         self.obj._parse(obj)
-                    obj['subject'] = DN(obj['subject'])
-                    obj['issuer'] = DN(obj['issuer'])
-                    obj['revoked'] = (
-                        obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
+                        if not all:
+                            del obj['certificate']
+                            del obj['valid_not_before']
+                            del obj['valid_not_after']
+                            del obj['md5_fingerprint']
+                            del obj['sha1_fingerprint']
+                    if 'subject' in obj:
+                        obj['subject'] = DN(obj['subject'])
+                    if 'issuer' in obj:
+                        obj['issuer'] = DN(obj['issuer'])
+                    if 'status' in obj:
+                        obj['revoked'] = (
+                            obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
+                    if 'owner' in obj:
+                        if all or not no_members:
+                            self.obj._fill_owners(obj)
+                        del obj['owner']
+                else:
+                    if 'certificate' in obj:
+                        if not all:
+                            del obj['certificate']
+                    if 'owner' in obj:
+                        if not all and no_members:
+                            del obj['owner']
             else:
-                serial_number = obj['serial_number']
-                obj.clear()
-                obj['serial_number'] = serial_number
+                if 'serial_number' in obj:
+                    serial_number = obj['serial_number']
+                    obj.clear()
+                    obj['serial_number'] = serial_number
+                else:
+                    obj.clear()
             result.append(obj)
 
         ret = dict(
             result=result
         )
         ret['count'] = len(ret['result'])
-        ret['truncated'] = False
+        ret['truncated'] = bool(truncated)
         return ret
 
 
-- 
2.7.4

-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to