URL: https://github.com/freeipa/freeipa/pull/453
Author: tiran
 Title: #453: Cleanup certdb
Action: synchronized

To pull the PR as Git branch:
git remote add ghfreeipa https://github.com/freeipa/freeipa
git fetch ghfreeipa pull/453/head:pr453
git checkout pr453
From ff90bf51a9c82073be6de9a4c43e1d8271dcfa06 Mon Sep 17 00:00:00 2001
From: Christian Heimes <chei...@redhat.com>
Date: Thu, 9 Feb 2017 14:55:45 +0100
Subject: [PATCH] Cleanup certdb

* use with statement to open/close files
* prefer fchmod/fchown when a file descriptor is available
* set permission before data is written to file
* remove chdir() hack with proper cwd argument to ipautil.run()

Do not ever change the working directory of a program. It's a really bad
idea. Just consider what is going to happen if two threads or two
different parts of a process decide to own control over the working
directory?

Signed-off-by: Christian Heimes <chei...@redhat.com>
---
 ipaserver/install/certs.py | 162 +++++++++++++++++++++------------------------
 1 file changed, 74 insertions(+), 88 deletions(-)

diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py
index 80918d4..a4f7149 100644
--- a/ipaserver/install/certs.py
+++ b/ipaserver/install/certs.py
@@ -103,10 +103,6 @@ def __init__(
         self.host_name = host_name
         self.ca_subject = ca_subject
         self.subject_base = subject_base
-        try:
-            self.cwd = os.getcwd()
-        except OSError as e:
-            raise RuntimeError("Unable to determine the current directory: %s" % str(e))
 
         self.cacert_name = get_ca_nickname(self.realm)
         self.valid_months = "120"
@@ -132,10 +128,8 @@ def __init__(
     def __del__(self):
         if self.reqdir is not None:
             shutil.rmtree(self.reqdir, ignore_errors=True)
-        try:
-            os.chdir(self.cwd)
-        except OSError:
-            pass
+            self.reqdir = None
+        self.nssdb.close()
 
     def setup_cert_request(self):
         """
@@ -152,23 +146,26 @@ def setup_cert_request(self):
         self.certreq_fname = self.reqdir + "/tmpcertreq"
         self.certder_fname = self.reqdir + "/tmpcert.der"
 
-        # When certutil makes a request it creates a file in the cwd, make
-        # sure we are in a unique place when this happens
-        os.chdir(self.reqdir)
-
-    def set_perms(self, fname, write=False, uid=None):
-        if uid:
-            pent = pwd.getpwnam(uid)
-            os.chown(fname, pent.pw_uid, pent.pw_gid)
+    def set_perms(self, fname, write=False, user=None):
+        if user is not None:
+            pent = pwd.getpwnam(user)
+            uid, gid = pent.pw_uid, pent.pw_gid
         else:
-            os.chown(fname, self.uid, self.gid)
+            uid, gid = self.uid, self.gid
         perms = stat.S_IRUSR
         if write:
             perms |= stat.S_IWUSR
-        os.chmod(fname, perms)
+        if hasattr(fname, 'fileno'):
+            os.fchown(fname.fileno(), uid, gid)
+            os.fchmod(fname.fileno(), perms)
+        else:
+            os.chown(fname, uid, gid)
+            os.chmod(fname, perms)
 
     def run_certutil(self, args, stdin=None, **kwargs):
-        return self.nssdb.run_certutil(args, stdin, **kwargs)
+        # When certutil makes a request it creates a file in the cwd, make
+        # sure we are in a unique place when this happens
+        return self.nssdb.run_certutil(args, stdin, cwd=self.reqdir, **kwargs)
 
     def run_signtool(self, args, stdin=None):
         with open(self.passwd_fname, "r") as f:
@@ -176,24 +173,23 @@ def run_signtool(self, args, stdin=None):
         new_args = [paths.SIGNTOOL, "-d", self.secdir, "-p", password]
 
         new_args = new_args + args
-        ipautil.run(new_args, stdin)
+        ipautil.run(new_args, stdin, cwd=self.reqdir)
 
     def create_noise_file(self):
         if ipautil.file_exists(self.noise_fname):
             os.remove(self.noise_fname)
-        f = open(self.noise_fname, "w")
-        f.write(ipautil.ipa_generate_password())
-        self.set_perms(self.noise_fname)
+        with open(self.noise_fname, "w") as f:
+            self.set_perms(f)
+            f.write(ipautil.ipa_generate_password())
 
     def create_passwd_file(self, passwd=None):
         ipautil.backup_file(self.passwd_fname)
-        f = open(self.passwd_fname, "w")
-        if passwd is not None:
-            f.write("%s\n" % passwd)
-        else:
-            f.write(ipautil.ipa_generate_password())
-        f.close()
-        self.set_perms(self.passwd_fname)
+        with open(self.passwd_fname, "w") as f:
+            self.set_perms(f)
+            if passwd is not None:
+                f.write("%s\n" % passwd)
+            else:
+                f.write(ipautil.ipa_generate_password())
 
     def create_certdbs(self):
         ipautil.backup_file(self.certdb_fname)
@@ -232,20 +228,21 @@ def export_ca_cert(self, nickname, create_pkcs12=False):
         # export the CA cert for use with other apps
         ipautil.backup_file(self.cacert_fname)
         root_nicknames = self.find_root_cert(nickname)[:-1]
-        fd = open(self.cacert_fname, "w")
-        for root in root_nicknames:
-            result = self.run_certutil(["-L", "-n", root, "-a"],
-                                       capture_output=True)
-            fd.write(result.output)
-        fd.close()
-        os.chmod(self.cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+        with open(self.cacert_fname, "w") as f:
+            os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+            for root in root_nicknames:
+                result = self.run_certutil(["-L", "-n", root, "-a"],
+                                           capture_output=True)
+                f.write(result.output)
+
         if create_pkcs12:
             ipautil.backup_file(self.pk12_fname)
             ipautil.run([paths.PK12UTIL, "-d", self.secdir,
                          "-o", self.pk12_fname,
                          "-n", self.cacert_name,
                          "-w", self.passwd_fname,
-                         "-k", self.passwd_fname])
+                         "-k", self.passwd_fname],
+                        cwd=self.reqdir)
             self.set_perms(self.pk12_fname)
 
     def load_cacert(self, cacert_fname, trust_flags):
@@ -253,9 +250,8 @@ def load_cacert(self, cacert_fname, trust_flags):
         Load all the certificates from a given file. It is assumed that
         this file creates CA certificates.
         """
-        fd = open(cacert_fname)
-        certs = fd.read()
-        fd.close()
+        with open(cacert_fname) as f:
+            certs = f.read()
 
         st = 0
         while True:
@@ -336,14 +332,14 @@ def create_server_cert(self, nickname, hostname, other_certdb=None, subject=None
         if subject is None:
             subject=DN(('CN', hostname), self.subject_base)
         self.request_cert(subject, san_dnsnames=[hostname])
-        cdb.issue_server_cert(self.certreq_fname, self.certder_fname)
-        self.import_cert(self.certder_fname, nickname)
-        fd = open(self.certder_fname, "r")
-        dercert = fd.read()
-        fd.close()
-
-        os.unlink(self.certreq_fname)
-        os.unlink(self.certder_fname)
+        try:
+            cdb.issue_server_cert(self.certreq_fname, self.certder_fname)
+            self.import_cert(self.certder_fname, nickname)
+            with open(self.certder_fname, "r") as f:
+                dercert = f.read()
+        finally:
+            os.unlink(self.certreq_fname)
+            os.unlink(self.certder_fname)
 
         return dercert
 
@@ -373,10 +369,8 @@ def issue_server_cert(self, certreq_fname, cert_fname):
         if self.host_name is None:
             raise RuntimeError("CA Host is not set.")
 
-        f = open(certreq_fname, "r")
-        csr = f.readlines()
-        f.close()
-        csr = "".join(csr)
+        with open(certreq_fname, "r") as f:
+            csr = f.read()
 
         # We just want the CSR bits, make sure there is nothing else
         csr = pkcs10.strip_header(csr)
@@ -388,9 +382,9 @@ def issue_server_cert(self, certreq_fname, cert_fname):
                 'xmlOutput': 'true'}
 
         # Send the request to the CA
-        f = open(self.passwd_fname, "r")
-        password = f.readline()
-        f.close()
+        with open(self.passwd_fname, "r") as f:
+            password = f.readline()
+
         result = dogtag.https_request(
             self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient",
             self.secdir, password, "ipaCert", **params)
@@ -417,9 +411,8 @@ def issue_server_cert(self, certreq_fname, cert_fname):
 
         # Write the certificate to a file. It will be imported in a later
         # step. This file will be read later to be imported.
-        f = open(cert_fname, "w")
-        f.write(cert)
-        f.close()
+        with open(cert_fname, "w") as f:
+            f.write(cert)
 
     def issue_signing_cert(self, certreq_fname, cert_fname):
         self.setup_cert_request()
@@ -427,10 +420,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname):
         if self.host_name is None:
             raise RuntimeError("CA Host is not set.")
 
-        f = open(certreq_fname, "r")
-        csr = f.readlines()
-        f.close()
-        csr = "".join(csr)
+        with open(certreq_fname, "r") as f:
+            csr = f.read()
 
         # We just want the CSR bits, make sure there is no thing else
         csr = pkcs10.strip_header(csr)
@@ -442,9 +433,9 @@ def issue_signing_cert(self, certreq_fname, cert_fname):
                 'xmlOutput': 'true'}
 
         # Send the request to the CA
-        f = open(self.passwd_fname, "r")
-        password = f.readline()
-        f.close()
+        with open(self.passwd_fname, "r") as f:
+            password = f.readline()
+
         result = dogtag.https_request(
             self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient",
             self.secdir, password, "ipaCert", **params)
@@ -463,9 +454,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname):
 
         # Write the certificate to a file. It will be imported in a later
         # step. This file will be read later to be imported.
-        f = open(cert_fname, "w")
-        f.write(cert)
-        f.close()
+        with open(cert_fname, "w") as f:
+            f.write(cert)
 
     def add_cert(self, cert, nick, flags, pem=False):
         self.nssdb.add_cert(cert, nick, flags, pem)
@@ -488,26 +478,22 @@ def create_pin_file(self):
         This is the format of Directory Server pin files.
         """
         ipautil.backup_file(self.pin_fname)
-        f = open(self.pin_fname, "w")
-        f.write("Internal (Software) Token:")
-        pwdfile = open(self.passwd_fname)
-        f.write(pwdfile.read())
-        f.close()
-        pwdfile.close()
-        self.set_perms(self.pin_fname)
+        with open(self.pin_fname, "w") as pinfile:
+            self.set_perms(pinfile)
+            pinfile.write("Internal (Software) Token:")
+            with open(self.passwd_fname) as pwdfile:
+                pinfile.write(pwdfile.read())
 
     def create_password_conf(self):
         """
         This is the format of mod_nss pin files.
         """
         ipautil.backup_file(self.pwd_conf)
-        f = open(self.pwd_conf, "w")
-        f.write("internal:")
-        pwdfile = open(self.passwd_fname)
-        f.write(pwdfile.read())
-        f.close()
-        pwdfile.close()
-        self.set_perms(self.pwd_conf, uid=constants.HTTPD_USER)
+        with open(self.pwd_conf, "w") as pwdconf:
+            self.set_perms(pwdconf, user=constants.HTTPD_USER)
+            pwdconf.write("internal:")
+            with open(self.passwd_fname) as pwdfile:
+                pwdconf.write(pwdfile.read())
 
     def find_root_cert(self, nickname):
         """
@@ -543,7 +529,8 @@ def export_pkcs12(self, pkcs12_fname, pkcs12_pwd_fname, nickname=None):
                      "-o", pkcs12_fname,
                      "-n", nickname,
                      "-k", self.passwd_fname,
-                     "-w", pkcs12_pwd_fname])
+                     "-w", pkcs12_pwd_fname],
+                    cwd=self.reqdir)
 
     def export_pem_p12(self, pkcs12_fname, pkcs12_pwd_fname,
                        nickname, pem_fname):
@@ -556,10 +543,9 @@ def create_from_cacert(self, cacert_fname, passwd=None):
         if ipautil.file_exists(self.certdb_fname):
             # We already have a cert db, see if it is for the same CA.
             # If it is we leave things as they are.
-            f = open(cacert_fname, "r")
-            newca = f.readlines()
-            f.close()
-            newca = "".join(newca)
+            with open(cacert_fname, "r") as f:
+                newca = f.read()
+
             newca, _st = find_cert_from_txt(newca)
 
             cacert = self.get_cert_from_db(self.cacert_name)
-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to