On 20.6.2016 15:31, Jan Cholasta wrote:
On 20.6.2016 09:54, Jan Cholasta wrote:
On 15.6.2016 12:33, Jan Cholasta wrote:
On 14.6.2016 11:44, Jan Cholasta wrote:
On 21.4.2016 09:11, Jan Cholasta wrote:
On 6.4.2016 15:46, Pavel Vomacka wrote:
On 03/16/2016 01:50 PM, Jan Cholasta wrote:
Hi,
the attached patches implement the server-side part of
<https://fedorahosted.org/freeipa/ticket/5381>.
Honza
Hi,
thank you for the patches. I tested them and they work well. But I
would
like to ask you whether would be possible to extend the response of
'basecert_find' method and probably also 'basecert_show' response. I
think of these information:
1) information whether the certificate is issued by our CA or not.
You can check for that by comparing the issuer name of the certificate
to "CN=Certificate Authority,$SUBJECT_BASE". You can get subject base
from config-show.
2) this probably wouldn't be possible (as we discussed), but I rather
write it too - the information about revocation reason. The same as
the
'cert_show' provides.
Added --check-revocation flag to request this information.
Currently it
works only on certificates issued by our CA.
3) MD5 and SHA1 fingerprints as the 'cert_show' method returns
Added, also included SHA-256.
Thank you again.
Updated patches attached.
Updated and rebased patches attached. Requires Fraser's sub-CA patches.
Attaching updated patch 623, which fixes these issues found by David:
<https://paste.fedoraproject.org/378997/65913663/>.
Updated and rebased patches attached.
Attaching updated patches 552 and 623, which fix the --sizelimit option.
Updated and rebased patches attached. The --revocation-reason option now
works as expected.
--
Jan Cholasta
From c934c8b13d663177c0fa4344738052753e38ff1d Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Wed, 16 Mar 2016 13:09:11 +0100
Subject: [PATCH 1/4] ldap: fix handling of binary data in search filters
This fixes a UnicodeDecodeError when passing non-UTF-8 binary data to
LDAPClient.make_filter() and friends.
https://fedorahosted.org/freeipa/ticket/5381
---
ipapython/ipaldap.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/ipapython/ipaldap.py b/ipapython/ipaldap.py
index 410ddae..23405c6 100644
--- a/ipapython/ipaldap.py
+++ b/ipapython/ipaldap.py
@@ -1211,7 +1211,12 @@ class LDAPClient(object):
]
return self.combine_filters(flts, rules)
elif value is not None:
- value = ldap.filter.escape_filter_chars(value_to_utf8(value))
+ if isinstance(value, bytes):
+ if six.PY3:
+ value = value.decode('raw_unicode_escape')
+ else:
+ value = value_to_utf8(value)
+ value = ldap.filter.escape_filter_chars(value)
if not exact:
template = '%s'
if leading_wildcard:
--
2.9.0
From 452893253f32c7be01d43ce9ed76fba84fd2bab6 Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Tue, 14 Jun 2016 06:29:18 +0200
Subject: [PATCH 2/4] cert: add object plugin
Implement cert as an object with methods rather than a bunch of loosely
related commands.
https://fedorahosted.org/freeipa/ticket/5381
---
API.txt | 60 ++++--
VERSION | 4 +-
ipaclient/plugins/cert.py | 6 +-
ipaserver/plugins/cert.py | 522 +++++++++++++++++++++++++---------------------
4 files changed, 327 insertions(+), 265 deletions(-)
diff --git a/API.txt b/API.txt
index f2a0686..4d16c50 100644
--- a/API.txt
+++ b/API.txt
@@ -723,25 +723,27 @@ output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: cert_find
-args: 0,19,4
+args: 1,20,4
+arg: Str('criteria?')
option: Flag('all', autofill=True, cli_name='all', default=False)
-option: Str('cacn?', autofill=False, cli_name='ca')
+option: Str('cacn?', cli_name='ca')
option: Flag('exactly?', autofill=True, default=False)
-option: Str('issuedon_from?', autofill=False)
-option: Str('issuedon_to?', autofill=False)
-option: Str('issuer?', autofill=False)
+option: DateTime('issuedon_from?', autofill=False)
+option: DateTime('issuedon_to?', autofill=False)
+option: DNParam('issuer?', autofill=False)
option: Int('max_serial_number?', autofill=False)
option: Int('min_serial_number?', autofill=False)
+option: Flag('pkey_only?', autofill=True, default=False)
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Int('revocation_reason?', autofill=False)
-option: Str('revokedon_from?', autofill=False)
-option: Str('revokedon_to?', autofill=False)
-option: Int('sizelimit?', default=100)
+option: DateTime('revokedon_from?', autofill=False)
+option: DateTime('revokedon_to?', autofill=False)
+option: Int('sizelimit?')
option: Str('subject?', autofill=False)
-option: Str('validnotafter_from?', autofill=False)
-option: Str('validnotafter_to?', autofill=False)
-option: Str('validnotbefore_from?', autofill=False)
-option: Str('validnotbefore_to?', autofill=False)
+option: DateTime('validnotafter_from?', autofill=False)
+option: DateTime('validnotafter_to?', autofill=False)
+option: DateTime('validnotbefore_from?', autofill=False)
+option: DateTime('validnotbefore_to?', autofill=False)
option: Str('version?')
output: Output('count', type=[<type 'int'>])
output: ListOfEntries('result')
@@ -749,37 +751,49 @@ output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: Output('truncated', type=[<type 'bool'>])
command: cert_remove_hold
args: 1,1,1
-arg: Str('serial_number')
+arg: Int('serial_number')
option: Str('version?')
output: Output('result')
command: cert_request
-args: 1,6,1
+args: 1,8,3
arg: Str('csr', cli_name='csr_file')
option: Flag('add', autofill=True, default=False)
+option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca')
option: Str('principal')
option: Str('profile_id?')
+option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('request_type', autofill=True, default=u'pkcs10')
option: Str('version?')
-output: Output('result', type=[<type 'dict'>])
+output: Entry('result')
+output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
+output: PrimaryKey('value')
command: cert_revoke
args: 1,2,1
-arg: Str('serial_number')
+arg: Int('serial_number')
option: Int('revocation_reason', autofill=True, default=0)
option: Str('version?')
output: Output('result')
command: cert_show
-args: 1,3,1
-arg: Str('serial_number')
-option: Str('cacn?', autofill=False, cli_name='ca')
+args: 1,5,3
+arg: Int('serial_number')
+option: Flag('all', autofill=True, cli_name='all', default=False)
+option: Str('cacn?', cli_name='ca')
option: Str('out?')
+option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('version?')
-output: Output('result')
+output: Entry('result')
+output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
+output: PrimaryKey('value')
command: cert_status
-args: 1,1,1
-arg: Str('request_id')
+args: 1,3,3
+arg: Int('request_id')
+option: Flag('all', autofill=True, cli_name='all', default=False)
+option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('version?')
-output: Output('result')
+output: Entry('result')
+output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
+output: PrimaryKey('value')
command: certprofile_del
args: 1,2,3
arg: Str('cn+', cli_name='id')
diff --git a/VERSION b/VERSION
index faf10e3..32e9b79 100644
--- a/VERSION
+++ b/VERSION
@@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000
# #
########################################################
IPA_API_VERSION_MAJOR=2
-IPA_API_VERSION_MINOR=193
-# Last change: schema: remove `no_cli` from command schema
+IPA_API_VERSION_MINOR=194
+# Last change: cert: add object plugin
diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py
index 1b840ac..7e8e156 100644
--- a/ipaclient/plugins/cert.py
+++ b/ipaclient/plugins/cert.py
@@ -19,7 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from ipaclient.frontend import CommandOverride
+from ipaclient.frontend import MethodOverride
from ipalib import errors
from ipalib import x509
from ipalib import util
@@ -30,7 +30,7 @@ register = Registry()
@register(override=True)
-class cert_request(CommandOverride):
+class cert_request(MethodOverride):
def get_args(self):
for arg in super(cert_request, self).get_args():
if arg.name == 'csr':
@@ -39,7 +39,7 @@ class cert_request(CommandOverride):
@register(override=True)
-class cert_show(CommandOverride):
+class cert_show(MethodOverride):
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index 76a2fbc..3b180ee 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -19,9 +19,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import time
import binascii
+import datetime
+import os
+
+from nss import nss
+from nss.error import NSPRError
+from pyasn1.error import PyAsn1Error
+import six
from ipalib import Command, Str, Int, Flag
from ipalib import api
@@ -30,6 +35,9 @@ from ipalib import pkcs10
from ipalib import x509
from ipalib import ngettext
from ipalib.constants import IPA_CA_CN
+from ipalib.crud import Create, PKQuery, Retrieve, Search
+from ipalib.frontend import Method, Object
+from ipalib.parameters import Bytes, DateTime, DNParam
from ipalib.plugable import Registry
from .virtual import VirtualCommand
from .baseldap import pkey_to_value
@@ -42,11 +50,6 @@ from ipalib import output
from .service import validate_principal
from ipapython.dn import DN
-import six
-import nss.nss as nss
-from nss.error import NSPRError
-from pyasn1.error import PyAsn1Error
-
if six.PY3:
unicode = str
@@ -134,16 +137,12 @@ USER, HOST, SERVICE = range(3)
register = Registry()
-def validate_pkidate(ugettext, value):
- """
- A date in the format of %Y-%m-%d
- """
- try:
- ts = time.strptime(value, '%Y-%m-%d')
- except ValueError as e:
- return str(e)
+PKIDATE_FORMAT = '%Y-%m-%d'
+
+
+def normalize_pkidate(value):
+ return datetime.datetime.strptime(value, PKIDATE_FORMAT)
- return None
def validate_csr(ugettext, csr):
"""
@@ -182,7 +181,8 @@ def normalize_csr(csr):
return csr
-def _convert_serial_number(num):
+
+def normalize_serial_number(num):
"""
Convert a SN given in decimal or hexadecimal.
Returns the number or None if conversion fails.
@@ -195,18 +195,10 @@ def _convert_serial_number(num):
# hexa without prefix
num = int(num, 16)
except ValueError:
- num = None
-
- return num
+ pass
-def validate_serial_number(ugettext, num):
- if _convert_serial_number(num) == None:
- return u"Decimal or hexadecimal number is required for serial number"
- return None
+ return unicode(num)
-def normalize_serial_number(num):
- # It's been already validated
- return unicode(_convert_serial_number(num))
def get_host_from_principal(principal):
"""
@@ -242,85 +234,154 @@ def caacl_check(principal_type, principal_string, ca, profile_id):
)
)
-@register()
-class cert_request(VirtualCommand):
- __doc__ = _('Submit a certificate signing request.')
-
- takes_args = (
- Str(
- 'csr', validate_csr,
- label=_('CSR'),
- cli_name='csr_file',
- normalizer=normalize_csr,
- noextrawhitespace=False,
- ),
- )
- operation="request certificate"
-
- takes_options = (
- Str('principal',
- label=_('Principal'),
- doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'),
- ),
- Str('request_type',
- default=u'pkcs10',
- autofill=True,
- ),
- Flag('add',
- doc=_("automatically add the principal if it doesn't exist"),
- default=False,
- autofill=True
- ),
- Str('profile_id?', validate_profile_id,
- label=_("Profile ID"),
- doc=_("Certificate Profile to use"),
- ),
- Str('cacn?',
- cli_name='ca',
- query=True,
- label=_("CA"),
- doc=_("CA to use"),
- ),
- )
- has_output_params = (
- Str('certificate',
- label=_('Certificate'),
+class BaseCertObject(Object):
+ takes_params = (
+ Bytes(
+ 'certificate',
+ label=_("Certificate"),
+ doc=_("Base-64 encoded certificate."),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('subject',
+ DNParam(
+ 'subject',
label=_('Subject'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('issuer',
+ DNParam(
+ 'issuer',
label=_('Issuer'),
+ doc=_('Issuer DN'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('valid_not_before',
+ DateTime(
+ 'valid_not_before',
label=_('Not Before'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('valid_not_after',
+ DateTime(
+ 'valid_not_after',
label=_('Not After'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('md5_fingerprint',
+ Str(
+ 'md5_fingerprint',
label=_('Fingerprint (MD5)'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('sha1_fingerprint',
+ Str(
+ 'sha1_fingerprint',
label=_('Fingerprint (SHA1)'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('serial_number',
+ Int(
+ 'serial_number',
label=_('Serial number'),
+ doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
+ normalizer=normalize_serial_number,
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('serial_number_hex',
+ Str(
+ 'serial_number_hex',
label=_('Serial number (hex)'),
+ flags={'no_create', 'no_update', 'no_search'},
),
)
- has_output = (
- output.Output('result',
- type=dict,
- doc=_('Dictionary mapping variable name to value'),
+ def _parse(self, obj):
+ cert = x509.load_certificate(obj['certificate'])
+ obj['subject'] = DN(unicode(cert.subject))
+ obj['issuer'] = DN(unicode(cert.issuer))
+ obj['valid_not_before'] = unicode(cert.valid_not_before_str)
+ obj['valid_not_after'] = unicode(cert.valid_not_after_str)
+ obj['md5_fingerprint'] = unicode(
+ nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
+ obj['sha1_fingerprint'] = unicode(
+ nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
+ obj['serial_number'] = cert.serial_number
+ obj['serial_number_hex'] = u'0x%X' % cert.serial_number
+
+
+class BaseCertMethod(Method):
+ def get_options(self):
+ yield Str('cacn?',
+ cli_name='ca',
+ query=True,
+ label=_('Issuing CA'),
+ doc=_('Name of issuing CA'),
+ )
+
+ for option in super(BaseCertMethod, self).get_options():
+ yield option
+
+
+@register()
+class certreq(BaseCertObject):
+ takes_params = BaseCertObject.takes_params + (
+ Str(
+ 'request_type',
+ default=u'pkcs10',
+ autofill=True,
+ flags={'no_update', 'no_update', 'no_search'},
+ ),
+ Str(
+ 'profile_id?', validate_profile_id,
+ label=_("Profile ID"),
+ doc=_("Certificate Profile to use"),
+ flags={'no_update', 'no_update', 'no_search'},
+ ),
+ Str(
+ 'cert_request_status',
+ label=_('Request status'),
+ flags={'no_create', 'no_update', 'no_search'},
+ ),
+ Int(
+ 'request_id',
+ label=_('Request id'),
+ primary_key=True,
+ flags={'no_create', 'no_update', 'no_search', 'no_output'},
+ ),
+ )
+
+
+@register()
+class cert_request(Create, BaseCertMethod, VirtualCommand):
+ __doc__ = _('Submit a certificate signing request.')
+
+ obj_name = 'certreq'
+ attr_name = 'request'
+
+ takes_args = (
+ Str(
+ 'csr', validate_csr,
+ label=_('CSR'),
+ cli_name='csr_file',
+ normalizer=normalize_csr,
+ noextrawhitespace=False,
+ ),
+ )
+ operation="request certificate"
+
+ takes_options = (
+ Str(
+ 'principal',
+ label=_('Principal'),
+ doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'),
+ ),
+ Flag(
+ 'add',
+ doc=_("automatically add the principal if it doesn't exist"),
),
)
- def execute(self, csr, **kw):
+ def get_args(self):
+ # FIXME: the 'no_create' flag is ignored for positional arguments
+ for arg in super(cert_request, self).get_args():
+ if arg.name == 'request_id':
+ continue
+ yield arg
+
+ def execute(self, csr, all=False, raw=False, **kw):
ca_enabled_check()
ldap = self.api.Backend.ldap2
@@ -512,12 +573,9 @@ class cert_request(VirtualCommand):
# Request the certificate
result = self.Backend.ra.request_certificate(
csr, profile_id, ca_id, request_type=request_type)
- cert = x509.load_certificate(result['certificate'])
- result['issuer'] = unicode(cert.issuer)
- result['valid_not_before'] = unicode(cert.valid_not_before_str)
- result['valid_not_after'] = unicode(cert.valid_not_after_str)
- result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
- result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
+ if not raw:
+ self.obj._parse(result)
+ result['request_id'] = int(result['request_id'])
# Success? Then add it to the principal's entry
# (unless the profile tells us not to)
@@ -534,91 +592,72 @@ class cert_request(VirtualCommand):
api.Command['user_mod'](principal_name, **kwargs)
return dict(
- result=result
+ result=result,
+ value=pkey_to_value(int(result['request_id']), kw),
)
-
@register()
-class cert_status(VirtualCommand):
+class cert_status(Retrieve, BaseCertMethod, VirtualCommand):
__doc__ = _('Check the status of a certificate signing request.')
- takes_args = (
- Str('request_id',
- label=_('Request id'),
- flags=['no_create', 'no_update', 'no_search'],
- ),
- )
- has_output_params = (
- Str('cert_request_status',
- label=_('Request status'),
- ),
- takes_args[0],
- )
+ obj_name = 'certreq'
+ attr_name = 'status'
+
operation = "certificate status"
+ def get_options(self):
+ for option in super(cert_status, self).get_options():
+ if option.name == 'cacn':
+ continue
+ yield option
def execute(self, request_id, **kw):
ca_enabled_check()
self.check_access()
return dict(
- result=self.Backend.ra.check_request_status(request_id)
+ result=self.Backend.ra.check_request_status(str(request_id)),
+ value=pkey_to_value(request_id, kw),
)
-
-_serial_number = Str('serial_number',
- validate_serial_number,
- label=_('Serial number'),
- doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
- normalizer=normalize_serial_number,
-)
-
@register()
-class cert_show(VirtualCommand):
- __doc__ = _('Retrieve an existing certificate.')
-
- takes_args = _serial_number
-
- has_output_params = (
- Str('certificate',
- label=_('Certificate'),
- ),
- Str('subject',
- label=_('Subject'),
- ),
- Str('issuer',
- label=_('Issuer'),
- ),
- Str('valid_not_before',
- label=_('Not Before'),
- ),
- Str('valid_not_after',
- label=_('Not After'),
- ),
- Str('md5_fingerprint',
- label=_('Fingerprint (MD5)'),
+class cert(BaseCertObject):
+ takes_params = BaseCertObject.takes_params + (
+ Str(
+ 'status',
+ label=_('Status'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('sha1_fingerprint',
- label=_('Fingerprint (SHA1)'),
+ Flag(
+ 'revoked',
+ label=_('Revoked'),
+ flags={'no_create', 'no_update', 'no_search'},
),
- Str('revocation_reason',
+ Int(
+ 'revocation_reason',
label=_('Revocation reason'),
+ doc=_('Reason for revoking the certificate (0-10)'),
+ minvalue=0,
+ maxvalue=10,
+ flags={'no_create', 'no_update'},
),
- Str('serial_number_hex',
- label=_('Serial number (hex)'),
- ),
- _serial_number,
)
+ def get_params(self):
+ for param in super(cert, self).get_params():
+ if param.name == 'serial_number':
+ param = param.clone(primary_key=True)
+ elif param.name == 'issuer':
+ param = param.clone(flags=param.flags - {'no_search'})
+ yield param
+
+
+@register()
+class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
+ __doc__ = _('Retrieve an existing certificate.')
+
takes_options = (
- Str('cacn?',
- cli_name='ca',
- query=True,
- label=_('Issuing CA'),
- doc=_('Name of issuing CA'),
- autofill=False,
- ),
Str('out?',
label=_('Output filename'),
doc=_('File to store the certificate in.'),
@@ -628,7 +667,7 @@ class cert_show(VirtualCommand):
operation="retrieve certificate"
- def execute(self, serial_number, **options):
+ def execute(self, serial_number, all=False, raw=False, **options):
ca_enabled_check()
hostname = None
try:
@@ -648,7 +687,7 @@ class cert_show(VirtualCommand):
# Dogtag lightweight CAs have shared serial number domain, so
# we don't tell Dogtag the issuer (but we check the cert after).
#
- result=self.Backend.ra.get_certificate(serial_number)
+ result = self.Backend.ra.get_certificate(str(serial_number))
cert = x509.load_certificate(result['certificate'])
if issuer_dn is not None and DN(unicode(cert.issuer)) != DN(issuer_dn):
@@ -658,48 +697,37 @@ class cert_show(VirtualCommand):
"issued by CA '%(ca)s' not found")
% dict(serial=serial_number, ca=options['cacn']))
- result['subject'] = unicode(cert.subject)
- result['issuer'] = unicode(cert.issuer)
- result['valid_not_before'] = unicode(cert.valid_not_before_str)
- result['valid_not_after'] = unicode(cert.valid_not_after_str)
- result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
- result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
+ if not raw:
+ result['certificate'] = result['certificate'].replace('\r\n', '')
+ self.obj._parse(result)
+ result['revoked'] = ('revocation_reason' in result)
+
if hostname:
# If we have a hostname we want to verify that the subject
# of the certificate matches it, otherwise raise an error
if hostname != cert.subject.common_name: #pylint: disable=E1101
raise acierr
- return dict(result=result)
-
-
+ return dict(result=result, value=pkey_to_value(serial_number, options))
@register()
-class cert_revoke(VirtualCommand):
+class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
__doc__ = _('Revoke a certificate.')
- takes_args = _serial_number
-
- has_output_params = (
- Flag('revoked',
- label=_('Revoked'),
- ),
- )
operation = "revoke certificate"
- # FIXME: The default is 0. Is this really an Int param?
- takes_options = (
- Int('revocation_reason',
- label=_('Reason'),
- doc=_('Reason for revoking the certificate (0-10). Type '
- '"ipa help cert" for revocation reason details. '),
- minvalue=0,
- maxvalue=10,
+ def get_options(self):
+ # FIXME: The default is 0. Is this really an Int param?
+ yield self.obj.params['revocation_reason'].clone(
default=0,
- autofill=True
- ),
- )
+ autofill=True,
+ )
+
+ for option in super(cert_revoke, self).get_options():
+ if option.name == 'cacn':
+ continue
+ yield option
def execute(self, serial_number, **kw):
ca_enabled_check()
@@ -719,17 +747,15 @@ class cert_revoke(VirtualCommand):
raise errors.CertificateOperationError(error=_('7 is not a valid revocation reason'))
return dict(
result=self.Backend.ra.revoke_certificate(
- serial_number, revocation_reason=revocation_reason)
+ str(serial_number), revocation_reason=revocation_reason)
)
@register()
-class cert_remove_hold(VirtualCommand):
+class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
__doc__ = _('Take a revoked certificate off hold.')
- takes_args = _serial_number
-
has_output_params = (
Flag('unrevoked',
label=_('Unrevoked'),
@@ -740,17 +766,23 @@ class cert_remove_hold(VirtualCommand):
)
operation = "certificate remove hold"
+ def get_options(self):
+ for option in super(cert_remove_hold, self).get_options():
+ if option.name == 'cacn':
+ continue
+ yield option
+
def execute(self, serial_number, **kw):
ca_enabled_check()
self.check_access()
return dict(
- result=self.Backend.ra.take_certificate_off_hold(serial_number)
+ result=self.Backend.ra.take_certificate_off_hold(
+ str(serial_number))
)
-
@register()
-class cert_find(Command):
+class cert_find(Search, BaseCertMethod):
__doc__ = _('Search for existing certificates.')
takes_options = (
@@ -759,26 +791,6 @@ class cert_find(Command):
doc=_('Subject'),
autofill=False,
),
- Str('cacn?',
- cli_name='ca',
- query=True,
- label=_('Issuing CA'),
- doc=_('Name of issuing CA'),
- autofill=False,
- ),
- Str('issuer?',
- label=_('Issuer'),
- doc=_('Issuer DN'),
- autofill=False,
- ),
- Int('revocation_reason?',
- label=_('Reason'),
- doc=_('Reason for revoking the certificate (0-10). Type '
- '"ipa help cert" for revocation reason details.'),
- minvalue=0,
- maxvalue=10,
- autofill=False,
- ),
Int('min_serial_number?',
doc=_("minimum serial number"),
autofill=False,
@@ -795,60 +807,55 @@ class cert_find(Command):
doc=_('match the common name exactly'),
autofill=False,
),
- Str('validnotafter_from?', validate_pkidate,
+ DateTime('validnotafter_from?',
doc=_('Valid not after from this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
- Str('validnotafter_to?', validate_pkidate,
+ DateTime('validnotafter_to?',
doc=_('Valid not after to this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
- Str('validnotbefore_from?', validate_pkidate,
+ DateTime('validnotbefore_from?',
doc=_('Valid not before from this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
- Str('validnotbefore_to?', validate_pkidate,
+ DateTime('validnotbefore_to?',
doc=_('Valid not before to this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
- Str('issuedon_from?', validate_pkidate,
+ DateTime('issuedon_from?',
doc=_('Issued on from this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
- Str('issuedon_to?', validate_pkidate,
+ DateTime('issuedon_to?',
doc=_('Issued on to this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
- Str('revokedon_from?', validate_pkidate,
+ DateTime('revokedon_from?',
doc=_('Revoked on from this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
- Str('revokedon_to?', validate_pkidate,
+ DateTime('revokedon_to?',
doc=_('Revoked on to this date (YYYY-mm-dd)'),
+ normalizer=normalize_pkidate,
autofill=False,
),
+ Flag('pkey_only?',
+ label=_("Primary key only"),
+ doc=_("Results should contain primary key attribute only "
+ "(\"certificate\")"),
+ ),
Int('sizelimit?',
- label=_('Size Limit'),
- doc=_('Maximum number of certs returned'),
- flags=['no_display'],
+ label=_("Size Limit"),
+ doc=_("Maximum number of entries returned (0 is unlimited)"),
minvalue=0,
- default=100,
- ),
- )
-
- has_output = output.standard_list_of_entries
- has_output_params = (
- Str('serial_number_hex',
- label=_('Serial number (hex)'),
- ),
- Str('serial_number',
- label=_('Serial number'),
- ),
- Str('status',
- label=_('Status'),
- ),
- Str('subject',
- label=_('Subject'),
),
)
@@ -856,7 +863,8 @@ class cert_find(Command):
'%(count)d certificate matched', '%(count)d certificates matched', 0
)
- def execute(self, **options):
+ def execute(self, criteria=None, all=False, raw=False, pkey_only=False,
+ sizelimit=None, **options):
ca_enabled_check()
if 'cacn' in options:
@@ -870,8 +878,48 @@ class cert_find(Command):
else:
options['issuer'] = ca_sdn
+ if criteria is not None:
+ return dict(result=[], count=0, truncated=False)
+
+ obj_seq = []
+
+ ra_options = {}
+ for name, value in options.items():
+ if isinstance(value, datetime.datetime):
+ value = value.strftime(PKIDATE_FORMAT)
+ ra_options[name] = value
+ if sizelimit is not None and sizelimit != 0:
+ ra_options['sizelimit'] = sizelimit
+
+ for ra_obj in self.Backend.ra.find(ra_options):
+ obj = {}
+ if all:
+ ra_obj.update(
+ self.Backend.ra.get_certificate(
+ str(ra_obj['serial_number'])))
+ obj_seq.append(obj)
+ obj.update(ra_obj)
+
+ result = []
+ for obj in obj_seq:
+ if not pkey_only:
+ if not raw:
+ if 'certificate' in obj:
+ obj['certificate'] = (
+ obj['certificate'].replace('\r\n', ''))
+ self.obj._parse(obj)
+ obj['subject'] = DN(obj['subject'])
+ obj['issuer'] = DN(obj['issuer'])
+ obj['revoked'] = (
+ obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
+ else:
+ serial_number = obj['serial_number']
+ obj.clear()
+ obj['serial_number'] = serial_number
+ result.append(obj)
+
ret = dict(
- result=self.Backend.ra.find(options)
+ result=result
)
ret['count'] = len(ret['result'])
ret['truncated'] = False
--
2.9.0
From e9cd4414b2a54c9532d027e8d32f758913f6ae9d Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Tue, 14 Jun 2016 09:09:10 +0200
Subject: [PATCH 3/4] cert: add owner information
Get owner information from LDAP in cert-show and cert-find. Allow search by
owner in cert-find.
https://fedorahosted.org/freeipa/ticket/5381
---
API.txt | 13 ++-
VERSION | 4 +-
ipaserver/plugins/cert.py | 272 ++++++++++++++++++++++++++++++++++++++++------
3 files changed, 254 insertions(+), 35 deletions(-)
diff --git a/API.txt b/API.txt
index 4d16c50..ba94279 100644
--- a/API.txt
+++ b/API.txt
@@ -723,23 +723,31 @@ output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: cert_find
-args: 1,20,4
+args: 1,28,4
arg: Str('criteria?')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca')
option: Flag('exactly?', autofill=True, default=False)
+option: Str('host*', cli_name='hosts')
option: DateTime('issuedon_from?', autofill=False)
option: DateTime('issuedon_to?', autofill=False)
option: DNParam('issuer?', autofill=False)
option: Int('max_serial_number?', autofill=False)
option: Int('min_serial_number?', autofill=False)
+option: Str('no_host*', cli_name='no_hosts')
+option: Flag('no_members', autofill=True, default=True)
+option: Str('no_service*', cli_name='no_services')
+option: Str('no_user*', cli_name='no_users')
option: Flag('pkey_only?', autofill=True, default=False)
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Int('revocation_reason?', autofill=False)
option: DateTime('revokedon_from?', autofill=False)
option: DateTime('revokedon_to?', autofill=False)
+option: Str('service*', cli_name='services')
option: Int('sizelimit?')
option: Str('subject?', autofill=False)
+option: Int('timelimit?')
+option: Str('user*', cli_name='users')
option: DateTime('validnotafter_from?', autofill=False)
option: DateTime('validnotafter_to?', autofill=False)
option: DateTime('validnotbefore_from?', autofill=False)
@@ -775,10 +783,11 @@ option: Int('revocation_reason', autofill=True, default=0)
option: Str('version?')
output: Output('result')
command: cert_show
-args: 1,5,3
+args: 1,6,3
arg: Int('serial_number')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca')
+option: Flag('no_members', autofill=True, default=False)
option: Str('out?')
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('version?')
diff --git a/VERSION b/VERSION
index 32e9b79..46e17b9 100644
--- a/VERSION
+++ b/VERSION
@@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000
# #
########################################################
IPA_API_VERSION_MAJOR=2
-IPA_API_VERSION_MINOR=194
-# Last change: cert: add object plugin
+IPA_API_VERSION_MINOR=195
+# Last change: cert: add owner information
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index 3b180ee..39144dd 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -19,6 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import base64
import binascii
import datetime
import os
@@ -110,6 +111,9 @@ EXAMPLES:
Search for certificates based on issuance date
ipa cert-find --issuedon-from=2013-02-01 --issuedon-to=2013-02-07
+ Search for certificates owned by a specific user:
+ ipa cert-find --user=user
+
IPA currently immediately issues (or declines) all certificate requests so
the status of a request is not normally useful. This is for future use
or the case where a CA does not immediately issue a certificate.
@@ -652,9 +656,48 @@ class cert(BaseCertObject):
param = param.clone(flags=param.flags - {'no_search'})
yield param
+ for owner in self._owners():
+ yield owner.primary_key.clone_rename(
+ 'owner_{0}'.format(owner.name),
+ required=False,
+ multivalue=True,
+ primary_key=False,
+ label=_("Owner %s") % owner.object_name,
+ flags={'no_create', 'no_update', 'no_search'},
+ )
+
+ def _owners(self):
+ for name in ('user', 'host', 'service'):
+ yield self.api.Object[name]
+
+ def _fill_owners(self, obj):
+ for owner in self._owners():
+ container_dn = DN(owner.container_dn, self.api.env.basedn)
+ name = 'owner_' + owner.name
+ for dn in obj['owner']:
+ if dn.endswith(container_dn, 1):
+ value = owner.get_primary_key_from_dn(dn)
+ obj.setdefault(name, []).append(value)
+
+
+class CertMethod(BaseCertMethod):
+ def get_options(self):
+ for option in super(CertMethod, self).get_options():
+ yield option
+
+ for o in self.has_output:
+ if isinstance(o, (output.Entry, output.ListOfEntries)):
+ yield Flag(
+ 'no_members',
+ doc=_("Suppress processing of membership attributes."),
+ exclude='webui',
+ flags={'no_output'},
+ )
+ break
+
@register()
-class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
+class cert_show(Retrieve, CertMethod, VirtualCommand):
__doc__ = _('Retrieve an existing certificate.')
takes_options = (
@@ -667,7 +710,8 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
operation="retrieve certificate"
- def execute(self, serial_number, all=False, raw=False, **options):
+ def execute(self, serial_number, all=False, raw=False, no_members=False,
+ **options):
ca_enabled_check()
hostname = None
try:
@@ -697,10 +741,26 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
"issued by CA '%(ca)s' not found")
% dict(serial=serial_number, ca=options['cacn']))
+ if all or not no_members:
+ ldap = self.api.Backend.ldap2
+ filter = ldap.make_filter_from_attr(
+ 'usercertificate', base64.b64decode(result['certificate']))
+ try:
+ entries = ldap.get_entries(base_dn=self.api.env.basedn,
+ filter=filter,
+ attrs_list=[''])
+ except errors.EmptyResult:
+ entries = []
+ for entry in entries:
+ result.setdefault('owner', []).append(entry.dn)
+
if not raw:
result['certificate'] = result['certificate'].replace('\r\n', '')
self.obj._parse(result)
result['revoked'] = ('revocation_reason' in result)
+ if 'owner' in result:
+ self.obj._fill_owners(result)
+ del result['owner']
if hostname:
# If we have a hostname we want to verify that the subject
@@ -712,7 +772,7 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
@register()
-class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
+class cert_revoke(PKQuery, CertMethod, VirtualCommand):
__doc__ = _('Revoke a certificate.')
operation = "revoke certificate"
@@ -753,7 +813,7 @@ class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
@register()
-class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
+class cert_remove_hold(PKQuery, CertMethod, VirtualCommand):
__doc__ = _('Take a revoked certificate off hold.')
has_output_params = (
@@ -782,7 +842,7 @@ class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
@register()
-class cert_find(Search, BaseCertMethod):
+class cert_find(Search, CertMethod):
__doc__ = _('Search for existing certificates.')
takes_options = (
@@ -852,6 +912,11 @@ class cert_find(Search, BaseCertMethod):
doc=_("Results should contain primary key attribute only "
"(\"certificate\")"),
),
+ Int('timelimit?',
+ label=_('Time Limit'),
+ doc=_('Time limit of search in seconds (0 is unlimited)'),
+ minvalue=0,
+ ),
Int('sizelimit?',
label=_("Size Limit"),
doc=_("Maximum number of entries returned (0 is unlimited)"),
@@ -863,9 +928,65 @@ class cert_find(Search, BaseCertMethod):
'%(count)d certificate matched', '%(count)d certificates matched', 0
)
+ def get_options(self):
+ for option in super(cert_find, self).get_options():
+ if option.name == 'no_members':
+ option = option.clone(default=True,
+ flags=set(option.flags) | {'no_option'})
+ yield option
+
+ for owner in self.obj._owners():
+ yield owner.primary_key.clone_rename(
+ '{0}'.format(owner.name),
+ required=False,
+ multivalue=True,
+ primary_key=False,
+ query=True,
+ cli_name='{0}s'.format(owner.name),
+ doc=(_("Search for certificates with these owner %s.") %
+ owner.object_name_plural),
+ label=owner.object_name,
+ )
+ yield owner.primary_key.clone_rename(
+ 'no_{0}'.format(owner.name),
+ required=False,
+ multivalue=True,
+ primary_key=False,
+ query=True,
+ cli_name='no_{0}s'.format(owner.name),
+ doc=(_("Search for certificates without these owner %s.") %
+ owner.object_name_plural),
+ label=owner.object_name,
+ )
+
def execute(self, criteria=None, all=False, raw=False, pkey_only=False,
- sizelimit=None, **options):
- ca_enabled_check()
+ no_members=True, timelimit=None, sizelimit=None, **options):
+ ca_options = {'cacn',
+ 'revocation_reason',
+ 'issuer',
+ 'subject',
+ 'min_serial_number', 'max_serial_number',
+ 'exactly',
+ 'validnotafter_from', 'validnotafter_to',
+ 'validnotbefore_from', 'validnotbefore_to',
+ 'issuedon_from', 'issuedon_to',
+ 'revokedon_from', 'revokedon_to'}
+ ldap_options = {prefix + owner.name
+ for owner in self.obj._owners()
+ for prefix in ('', 'no_')}
+ has_ca_options = (
+ any(name in options for name in ca_options - {'exactly'}) or
+ options['exactly'])
+ has_ldap_options = any(name in options for name in ldap_options)
+
+ try:
+ ca_enabled_check()
+ except errors.NotFound:
+ if has_ca_options:
+ raise
+ ca_enabled = False
+ else:
+ ca_enabled = True
if 'cacn' in options:
ca_obj = api.Command.ca_show(options['cacn'])['result']
@@ -882,47 +1003,136 @@ class cert_find(Search, BaseCertMethod):
return dict(result=[], count=0, truncated=False)
obj_seq = []
+ obj_dict = {}
+ truncated = False
+
+ if ca_enabled:
+ ra_options = {}
+ for name, value in options.items():
+ if name not in ca_options:
+ continue
+ if isinstance(value, datetime.datetime):
+ value = value.strftime(PKIDATE_FORMAT)
+ ra_options[name] = value
+ if sizelimit is not None:
+ if sizelimit != 0:
+ ra_options['sizelimit'] = sizelimit
+ sizelimit = 0
+ has_ca_options = True
+
+ for ra_obj in self.Backend.ra.find(ra_options):
+ obj = {}
+ if ((not pkey_only and all) or
+ not no_members or
+ not has_ca_options or
+ has_ldap_options):
+ ra_obj.update(
+ self.Backend.ra.get_certificate(
+ str(ra_obj['serial_number'])))
+ cert = base64.b64decode(ra_obj['certificate'])
+ obj_dict[cert] = obj
+ obj_seq.append(obj)
+ obj.update(ra_obj)
+
+ if ((not pkey_only and all) or
+ not no_members or
+ not has_ca_options or
+ has_ldap_options):
+ ldap = self.api.Backend.ldap2
+
+ filters = []
+ cert_filter = '(usercertificate=*)'
+ filters.append(cert_filter)
+ for owner in self.obj._owners():
+ oc_filter = ldap.make_filter_from_attr(
+ 'objectclass', owner.object_class, ldap.MATCH_ALL)
+ for prefix, rule in (('', ldap.MATCH_ALL),
+ ('no_', ldap.MATCH_NONE)):
+ value = options.get(prefix + owner.name)
+ if value is None:
+ continue
+ pkey_filter = ldap.make_filter_from_attr(
+ owner.primary_key.name, value, rule)
+ filters.append(oc_filter)
+ filters.append(pkey_filter)
+ filter = ldap.combine_filters(filters, ldap.MATCH_ALL)
- ra_options = {}
- for name, value in options.items():
- if isinstance(value, datetime.datetime):
- value = value.strftime(PKIDATE_FORMAT)
- ra_options[name] = value
- if sizelimit is not None and sizelimit != 0:
- ra_options['sizelimit'] = sizelimit
-
- for ra_obj in self.Backend.ra.find(ra_options):
- obj = {}
- if all:
- ra_obj.update(
- self.Backend.ra.get_certificate(
- str(ra_obj['serial_number'])))
- obj_seq.append(obj)
- obj.update(ra_obj)
+ try:
+ entries, truncated = ldap.find_entries(
+ base_dn=self.api.env.basedn,
+ filter=filter,
+ attrs_list=['usercertificate'],
+ time_limit=timelimit,
+ size_limit=sizelimit,
+ )
+ except errors.EmptyResult:
+ entries, truncated = [], False
+ for entry in entries:
+ seen = set()
+ for attr in ('usercertificate', 'usercertificate;binary'):
+ for cert in entry.get(attr, []):
+ if cert in seen:
+ continue
+ seen.add(cert)
+ try:
+ obj = obj_dict[cert]
+ except KeyError:
+ if has_ca_options:
+ continue
+ obj = {
+ 'certificate': unicode(base64.b64encode(cert))}
+ obj_seq.append(obj)
+ obj_dict[cert] = obj
+ obj.setdefault('owner', []).append(entry.dn)
result = []
for obj in obj_seq:
+ if has_ldap_options and 'owner' not in obj:
+ continue
if not pkey_only:
if not raw:
if 'certificate' in obj:
obj['certificate'] = (
obj['certificate'].replace('\r\n', ''))
self.obj._parse(obj)
- obj['subject'] = DN(obj['subject'])
- obj['issuer'] = DN(obj['issuer'])
- obj['revoked'] = (
- obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
+ if not all:
+ del obj['certificate']
+ del obj['valid_not_before']
+ del obj['valid_not_after']
+ del obj['md5_fingerprint']
+ del obj['sha1_fingerprint']
+ if 'subject' in obj:
+ obj['subject'] = DN(obj['subject'])
+ if 'issuer' in obj:
+ obj['issuer'] = DN(obj['issuer'])
+ if 'status' in obj:
+ obj['revoked'] = (
+ obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
+ if 'owner' in obj:
+ if all or not no_members:
+ self.obj._fill_owners(obj)
+ del obj['owner']
+ else:
+ if 'certificate' in obj:
+ if not all:
+ del obj['certificate']
+ if 'owner' in obj:
+ if not all and no_members:
+ del obj['owner']
else:
- serial_number = obj['serial_number']
- obj.clear()
- obj['serial_number'] = serial_number
+ if 'serial_number' in obj:
+ serial_number = obj['serial_number']
+ obj.clear()
+ obj['serial_number'] = serial_number
+ else:
+ obj.clear()
result.append(obj)
ret = dict(
result=result
)
ret['count'] = len(ret['result'])
- ret['truncated'] = False
+ ret['truncated'] = bool(truncated)
return ret
--
2.9.0
From 936629adfbb6a0cb5be6d2cd5693db5a6aee2174 Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Tue, 14 Jun 2016 09:44:22 +0200
Subject: [PATCH 4/4] cert: allow search by certificate
Allow search by certificate data or file in cert-find.
https://fedorahosted.org/freeipa/ticket/5381
---
API.txt | 3 ++-
VERSION | 4 ++--
ipaclient/plugins/cert.py | 23 +++++++++++++++++++++++
ipaserver/plugins/cert.py | 48 +++++++++++++++++++++++++++++++++++++++--------
4 files changed, 67 insertions(+), 11 deletions(-)
diff --git a/API.txt b/API.txt
index ba94279..142ffcd 100644
--- a/API.txt
+++ b/API.txt
@@ -723,10 +723,11 @@ output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: cert_find
-args: 1,28,4
+args: 1,29,4
arg: Str('criteria?')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca')
+option: Bytes('certificate?', autofill=False)
option: Flag('exactly?', autofill=True, default=False)
option: Str('host*', cli_name='hosts')
option: DateTime('issuedon_from?', autofill=False)
diff --git a/VERSION b/VERSION
index 46e17b9..fe7614e 100644
--- a/VERSION
+++ b/VERSION
@@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000
# #
########################################################
IPA_API_VERSION_MAJOR=2
-IPA_API_VERSION_MINOR=195
-# Last change: cert: add owner information
+IPA_API_VERSION_MINOR=196
+# Last change: cert: allow search by certificate
diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py
index 7e8e156..de4318b 100644
--- a/ipaclient/plugins/cert.py
+++ b/ipaclient/plugins/cert.py
@@ -25,6 +25,7 @@ from ipalib import x509
from ipalib import util
from ipalib.parameters import File
from ipalib.plugable import Registry
+from ipalib.text import _
register = Registry()
@@ -51,3 +52,25 @@ class cert_show(MethodOverride):
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(cert_show, self).forward(*keys, **options)
+
+
+@register(override=True)
+class cert_find(MethodOverride):
+ takes_options = (
+ File(
+ 'file?',
+ label=_("Input filename"),
+ doc=_('File to load the certificate from.'),
+ include='cli',
+ ),
+ )
+
+ def forward(self, *args, **options):
+ if self.api.env.context == 'cli':
+ if 'certificate' in options and 'file' in options:
+ raise errors.MutuallyExclusiveError(
+ reason=_("cannot specify both raw certificate and file"))
+ if 'certificate' not in options and 'file' in options:
+ options['certificate'] = x509.strip_header(options.pop('file'))
+
+ return super(cert_find, self).forward(*args, **options)
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index 39144dd..564d582 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -114,6 +114,12 @@ EXAMPLES:
Search for certificates owned by a specific user:
ipa cert-find --user=user
+ Examine a certificate:
+ ipa cert-find --file=cert.pem --all
+
+ Verify that a certificate is owner by a specific user:
+ ipa cert-find --file=cert.pem --user=user
+
IPA currently immediately issues (or declines) all certificate requests so
the status of a request is not normally useful. This is for future use
or the case where a CA does not immediately issue a certificate.
@@ -239,12 +245,17 @@ def caacl_check(principal_type, principal_string, ca, profile_id):
)
+def validate_certificate(value):
+ return x509.validate_certificate(value, x509.DER)
+
+
class BaseCertObject(Object):
takes_params = (
Bytes(
- 'certificate',
+ 'certificate', validate_certificate,
label=_("Certificate"),
doc=_("Base-64 encoded certificate."),
+ normalizer=x509.normalize_certificate,
flags={'no_create', 'no_update', 'no_search'},
),
DNParam(
@@ -652,7 +663,7 @@ class cert(BaseCertObject):
for param in super(cert, self).get_params():
if param.name == 'serial_number':
param = param.clone(primary_key=True)
- elif param.name == 'issuer':
+ elif param.name in ('certificate', 'issuer'):
param = param.clone(flags=param.flags - {'no_search'})
yield param
@@ -978,6 +989,7 @@ class cert_find(Search, CertMethod):
any(name in options for name in ca_options - {'exactly'}) or
options['exactly'])
has_ldap_options = any(name in options for name in ldap_options)
+ has_cert_option = 'certificate' in options
try:
ca_enabled_check()
@@ -1006,6 +1018,12 @@ class cert_find(Search, CertMethod):
obj_dict = {}
truncated = False
+ if has_cert_option:
+ cert = options['certificate']
+ obj = {'certificate': unicode(base64.b64encode(cert))}
+ obj_seq.append(obj)
+ obj_dict[cert] = obj
+
if ca_enabled:
ra_options = {}
for name, value in options.items():
@@ -1025,23 +1043,37 @@ class cert_find(Search, CertMethod):
if ((not pkey_only and all) or
not no_members or
not has_ca_options or
- has_ldap_options):
+ has_ldap_options or
+ has_cert_option):
ra_obj.update(
self.Backend.ra.get_certificate(
str(ra_obj['serial_number'])))
cert = base64.b64decode(ra_obj['certificate'])
- obj_dict[cert] = obj
- obj_seq.append(obj)
+ try:
+ obj = obj_dict[cert]
+ except KeyError:
+ if has_cert_option:
+ continue
+ obj = {}
+ obj_seq.append(obj)
+ obj_dict[cert] = obj
+ else:
+ obj_seq.append(obj)
obj.update(ra_obj)
if ((not pkey_only and all) or
not no_members or
not has_ca_options or
- has_ldap_options):
+ has_ldap_options or
+ has_cert_option):
ldap = self.api.Backend.ldap2
filters = []
- cert_filter = '(usercertificate=*)'
+ if 'certificate' in options:
+ cert_filter = ldap.make_filter_from_attr(
+ 'usercertificate', options['certificate'])
+ else:
+ cert_filter = '(usercertificate=*)'
filters.append(cert_filter)
for owner in self.obj._owners():
oc_filter = ldap.make_filter_from_attr(
@@ -1077,7 +1109,7 @@ class cert_find(Search, CertMethod):
try:
obj = obj_dict[cert]
except KeyError:
- if has_ca_options:
+ if has_ca_options or has_cert_option:
continue
obj = {
'certificate': unicode(base64.b64encode(cert))}
--
2.9.0
--
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code