URL: https://github.com/freeipa/freeipa/pull/453 Author: tiran Title: #453: Cleanup certdb Action: synchronized
To pull the PR as Git branch: git remote add ghfreeipa https://github.com/freeipa/freeipa git fetch ghfreeipa pull/453/head:pr453 git checkout pr453
From be337e545ddfb65a34e3eb7087023f78480fbe8a Mon Sep 17 00:00:00 2001 From: Christian Heimes <chei...@redhat.com> Date: Thu, 9 Feb 2017 14:55:45 +0100 Subject: [PATCH] Cleanup certdb * use with statement to open/close files * prefer fchmod/fchown when a file descriptor is available * set permission before data is written to file Signed-off-by: Christian Heimes <chei...@redhat.com> --- ipaserver/install/certs.py | 135 +++++++++++++++++++++------------------------ 1 file changed, 63 insertions(+), 72 deletions(-) diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index bca2504..47bb23e 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -96,10 +96,12 @@ def __init__(self, realm, nssdir=paths.IPA_RADB_DIR, fstore=None, self.host_name = host_name self.ca_subject = ca_subject self.subject_base = subject_base + try: - self.cwd = os.getcwd() + self.cwd = os.path.abspath(os.getcwd()) except OSError as e: - raise RuntimeError("Unable to determine the current directory: %s" % str(e)) + raise RuntimeError( + "Unable to determine the current directory: %s" % str(e)) self.cacert_name = get_ca_nickname(self.realm) @@ -142,6 +144,8 @@ def passwd_fname(self): def __del__(self): if self.reqdir is not None: shutil.rmtree(self.reqdir, ignore_errors=True) + self.reqdir = None + self.nssdb.close() try: os.chdir(self.cwd) except OSError: @@ -162,22 +166,20 @@ def setup_cert_request(self): self.certreq_fname = self.reqdir + "/tmpcertreq" self.certder_fname = self.reqdir + "/tmpcert.der" - # When certutil makes a request it creates a file in the cwd, make - # sure we are in a unique place when this happens - os.chdir(self.reqdir) - - def set_perms(self, fname, write=False, uid=None): - if uid: - pent = pwd.getpwnam(uid) - os.chown(fname, pent.pw_uid, pent.pw_gid) - else: - os.chown(fname, self.uid, self.gid) + def set_perms(self, fname, write=False): perms = stat.S_IRUSR if write: perms |= stat.S_IWUSR - os.chmod(fname, perms) + if hasattr(fname, 'fileno'): + os.fchown(fname.fileno(), self.uid, self.gid) + os.fchmod(fname.fileno(), perms) + else: + os.chown(fname, self.uid, self.gid) + os.chmod(fname, perms) def run_certutil(self, args, stdin=None, **kwargs): + # When certutil makes a request it creates a file in the cwd, make + # sure we are in a unique place when this happens return self.nssdb.run_certutil(args, stdin, **kwargs) def run_signtool(self, args, stdin=None): @@ -191,19 +193,18 @@ def run_signtool(self, args, stdin=None): def create_noise_file(self): if ipautil.file_exists(self.noise_fname): os.remove(self.noise_fname) - f = open(self.noise_fname, "w") - f.write(ipautil.ipa_generate_password()) - self.set_perms(self.noise_fname) + with open(self.noise_fname, "w") as f: + self.set_perms(f) + f.write(ipautil.ipa_generate_password()) def create_passwd_file(self, passwd=None): ipautil.backup_file(self.passwd_fname) - f = open(self.passwd_fname, "w") - if passwd is not None: - f.write("%s\n" % passwd) - else: - f.write(ipautil.ipa_generate_password()) - f.close() - self.set_perms(self.passwd_fname) + with open(self.passwd_fname, "w") as f: + self.set_perms(f) + if passwd is not None: + f.write("%s\n" % passwd) + else: + f.write(ipautil.ipa_generate_password()) def create_certdbs(self): self.nssdb.create_db(user=self.user, group=self.group, mode=self.mode, @@ -241,13 +242,13 @@ def export_ca_cert(self, nickname, create_pkcs12=False): # export the CA cert for use with other apps ipautil.backup_file(cacert_fname) root_nicknames = self.find_root_cert(nickname)[:-1] - fd = open(cacert_fname, "w") - for root in root_nicknames: - result = self.run_certutil(["-L", "-n", root, "-a"], - capture_output=True) - fd.write(result.output) - fd.close() - os.chmod(cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + with open(cacert_fname, "w") as f: + os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + for root in root_nicknames: + result = self.run_certutil(["-L", "-n", root, "-a"], + capture_output=True) + f.write(result.output) + if create_pkcs12: ipautil.backup_file(self.pk12_fname) ipautil.run([paths.PK12UTIL, "-d", self.secdir, @@ -262,9 +263,8 @@ def load_cacert(self, cacert_fname, trust_flags): Load all the certificates from a given file. It is assumed that this file creates CA certificates. """ - fd = open(cacert_fname) - certs = fd.read() - fd.close() + with open(cacert_fname) as f: + certs = f.read() st = 0 while True: @@ -345,14 +345,14 @@ def create_server_cert(self, nickname, hostname, other_certdb=None, subject=None if subject is None: subject=DN(('CN', hostname), self.subject_base) self.request_cert(subject, san_dnsnames=[hostname]) - cdb.issue_server_cert(self.certreq_fname, self.certder_fname) - self.import_cert(self.certder_fname, nickname) - fd = open(self.certder_fname, "r") - dercert = fd.read() - fd.close() - - os.unlink(self.certreq_fname) - os.unlink(self.certder_fname) + try: + cdb.issue_server_cert(self.certreq_fname, self.certder_fname) + self.import_cert(self.certder_fname, nickname) + with open(self.certder_fname, "r") as f: + dercert = f.read() + finally: + os.unlink(self.certreq_fname) + os.unlink(self.certder_fname) return dercert @@ -382,10 +382,8 @@ def issue_server_cert(self, certreq_fname, cert_fname): if self.host_name is None: raise RuntimeError("CA Host is not set.") - f = open(certreq_fname, "r") - csr = f.readlines() - f.close() - csr = "".join(csr) + with open(certreq_fname, "r") as f: + csr = f.read() # We just want the CSR bits, make sure there is nothing else csr = pkcs10.strip_header(csr) @@ -397,9 +395,9 @@ def issue_server_cert(self, certreq_fname, cert_fname): 'xmlOutput': 'true'} # Send the request to the CA - f = open(self.passwd_fname, "r") - password = f.readline() - f.close() + with open(self.passwd_fname, "r") as f: + password = f.readline() + result = dogtag.https_request( self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient", self.secdir, password, "ipaCert", **params) @@ -426,9 +424,8 @@ def issue_server_cert(self, certreq_fname, cert_fname): # Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. - f = open(cert_fname, "w") - f.write(cert) - f.close() + with open(cert_fname, "w") as f: + f.write(cert) def issue_signing_cert(self, certreq_fname, cert_fname): self.setup_cert_request() @@ -436,10 +433,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname): if self.host_name is None: raise RuntimeError("CA Host is not set.") - f = open(certreq_fname, "r") - csr = f.readlines() - f.close() - csr = "".join(csr) + with open(certreq_fname, "r") as f: + csr = f.read() # We just want the CSR bits, make sure there is no thing else csr = pkcs10.strip_header(csr) @@ -451,9 +446,9 @@ def issue_signing_cert(self, certreq_fname, cert_fname): 'xmlOutput': 'true'} # Send the request to the CA - f = open(self.passwd_fname, "r") - password = f.readline() - f.close() + with open(self.passwd_fname, "r") as f: + password = f.readline() + result = dogtag.https_request( self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient", self.secdir, password, "ipaCert", **params) @@ -472,9 +467,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname): # Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. - f = open(cert_fname, "w") - f.write(cert) - f.close() + with open(cert_fname, "w") as f: + f.write(cert) def add_cert(self, cert, nick, flags, pem=False): self.nssdb.add_cert(cert, nick, flags, pem) @@ -497,13 +491,11 @@ def create_pin_file(self): This is the format of Directory Server pin files. """ ipautil.backup_file(self.pin_fname) - f = open(self.pin_fname, "w") - f.write("Internal (Software) Token:") - pwdfile = open(self.passwd_fname) - f.write(pwdfile.read()) - f.close() - pwdfile.close() - self.set_perms(self.pin_fname) + with open(self.pin_fname, "w") as pinfile: + self.set_perms(pinfile) + pinfile.write("Internal (Software) Token:") + with open(self.passwd_fname) as pwdfile: + pinfile.write(pwdfile.read()) def find_root_cert(self, nickname): """ @@ -553,10 +545,9 @@ def create_from_cacert(self): if ipautil.file_exists(self.certdb_fname): # We already have a cert db, see if it is for the same CA. # If it is we leave things as they are. - f = open(cacert_fname, "r") - newca = f.readlines() - f.close() - newca = "".join(newca) + with open(cacert_fname, "r") as f: + newca = f.read() + newca, _st = find_cert_from_txt(newca) cacert = self.get_cert_from_db(self.cacert_name)
-- Manage your subscription for the Freeipa-devel mailing list: https://www.redhat.com/mailman/listinfo/freeipa-devel Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code