URL: https://github.com/freeipa/freeipa/pull/453
Author: tiran
 Title: #453: Cleanup certdb
Action: synchronized

To pull the PR as Git branch:
git remote add ghfreeipa https://github.com/freeipa/freeipa
git fetch ghfreeipa pull/453/head:pr453
git checkout pr453
From be337e545ddfb65a34e3eb7087023f78480fbe8a Mon Sep 17 00:00:00 2001
From: Christian Heimes <chei...@redhat.com>
Date: Thu, 9 Feb 2017 14:55:45 +0100
Subject: [PATCH] Cleanup certdb

* use with statement to open/close files
* prefer fchmod/fchown when a file descriptor is available
* set permission before data is written to file

Signed-off-by: Christian Heimes <chei...@redhat.com>
---
 ipaserver/install/certs.py | 135 +++++++++++++++++++++------------------------
 1 file changed, 63 insertions(+), 72 deletions(-)

diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py
index bca2504..47bb23e 100644
--- a/ipaserver/install/certs.py
+++ b/ipaserver/install/certs.py
@@ -96,10 +96,12 @@ def __init__(self, realm, nssdir=paths.IPA_RADB_DIR, fstore=None,
         self.host_name = host_name
         self.ca_subject = ca_subject
         self.subject_base = subject_base
+
         try:
-            self.cwd = os.getcwd()
+            self.cwd = os.path.abspath(os.getcwd())
         except OSError as e:
-            raise RuntimeError("Unable to determine the current directory: %s" % str(e))
+            raise RuntimeError(
+                "Unable to determine the current directory: %s" % str(e))
 
         self.cacert_name = get_ca_nickname(self.realm)
 
@@ -142,6 +144,8 @@ def passwd_fname(self):
     def __del__(self):
         if self.reqdir is not None:
             shutil.rmtree(self.reqdir, ignore_errors=True)
+            self.reqdir = None
+        self.nssdb.close()
         try:
             os.chdir(self.cwd)
         except OSError:
@@ -162,22 +166,20 @@ def setup_cert_request(self):
         self.certreq_fname = self.reqdir + "/tmpcertreq"
         self.certder_fname = self.reqdir + "/tmpcert.der"
 
-        # When certutil makes a request it creates a file in the cwd, make
-        # sure we are in a unique place when this happens
-        os.chdir(self.reqdir)
-
-    def set_perms(self, fname, write=False, uid=None):
-        if uid:
-            pent = pwd.getpwnam(uid)
-            os.chown(fname, pent.pw_uid, pent.pw_gid)
-        else:
-            os.chown(fname, self.uid, self.gid)
+    def set_perms(self, fname, write=False):
         perms = stat.S_IRUSR
         if write:
             perms |= stat.S_IWUSR
-        os.chmod(fname, perms)
+        if hasattr(fname, 'fileno'):
+            os.fchown(fname.fileno(), self.uid, self.gid)
+            os.fchmod(fname.fileno(), perms)
+        else:
+            os.chown(fname, self.uid, self.gid)
+            os.chmod(fname, perms)
 
     def run_certutil(self, args, stdin=None, **kwargs):
+        # When certutil makes a request it creates a file in the cwd, make
+        # sure we are in a unique place when this happens
         return self.nssdb.run_certutil(args, stdin, **kwargs)
 
     def run_signtool(self, args, stdin=None):
@@ -191,19 +193,18 @@ def run_signtool(self, args, stdin=None):
     def create_noise_file(self):
         if ipautil.file_exists(self.noise_fname):
             os.remove(self.noise_fname)
-        f = open(self.noise_fname, "w")
-        f.write(ipautil.ipa_generate_password())
-        self.set_perms(self.noise_fname)
+        with open(self.noise_fname, "w") as f:
+            self.set_perms(f)
+            f.write(ipautil.ipa_generate_password())
 
     def create_passwd_file(self, passwd=None):
         ipautil.backup_file(self.passwd_fname)
-        f = open(self.passwd_fname, "w")
-        if passwd is not None:
-            f.write("%s\n" % passwd)
-        else:
-            f.write(ipautil.ipa_generate_password())
-        f.close()
-        self.set_perms(self.passwd_fname)
+        with open(self.passwd_fname, "w") as f:
+            self.set_perms(f)
+            if passwd is not None:
+                f.write("%s\n" % passwd)
+            else:
+                f.write(ipautil.ipa_generate_password())
 
     def create_certdbs(self):
         self.nssdb.create_db(user=self.user, group=self.group, mode=self.mode,
@@ -241,13 +242,13 @@ def export_ca_cert(self, nickname, create_pkcs12=False):
         # export the CA cert for use with other apps
         ipautil.backup_file(cacert_fname)
         root_nicknames = self.find_root_cert(nickname)[:-1]
-        fd = open(cacert_fname, "w")
-        for root in root_nicknames:
-            result = self.run_certutil(["-L", "-n", root, "-a"],
-                                       capture_output=True)
-            fd.write(result.output)
-        fd.close()
-        os.chmod(cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+        with open(cacert_fname, "w") as f:
+            os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+            for root in root_nicknames:
+                result = self.run_certutil(["-L", "-n", root, "-a"],
+                                           capture_output=True)
+                f.write(result.output)
+
         if create_pkcs12:
             ipautil.backup_file(self.pk12_fname)
             ipautil.run([paths.PK12UTIL, "-d", self.secdir,
@@ -262,9 +263,8 @@ def load_cacert(self, cacert_fname, trust_flags):
         Load all the certificates from a given file. It is assumed that
         this file creates CA certificates.
         """
-        fd = open(cacert_fname)
-        certs = fd.read()
-        fd.close()
+        with open(cacert_fname) as f:
+            certs = f.read()
 
         st = 0
         while True:
@@ -345,14 +345,14 @@ def create_server_cert(self, nickname, hostname, other_certdb=None, subject=None
         if subject is None:
             subject=DN(('CN', hostname), self.subject_base)
         self.request_cert(subject, san_dnsnames=[hostname])
-        cdb.issue_server_cert(self.certreq_fname, self.certder_fname)
-        self.import_cert(self.certder_fname, nickname)
-        fd = open(self.certder_fname, "r")
-        dercert = fd.read()
-        fd.close()
-
-        os.unlink(self.certreq_fname)
-        os.unlink(self.certder_fname)
+        try:
+            cdb.issue_server_cert(self.certreq_fname, self.certder_fname)
+            self.import_cert(self.certder_fname, nickname)
+            with open(self.certder_fname, "r") as f:
+                dercert = f.read()
+        finally:
+            os.unlink(self.certreq_fname)
+            os.unlink(self.certder_fname)
 
         return dercert
 
@@ -382,10 +382,8 @@ def issue_server_cert(self, certreq_fname, cert_fname):
         if self.host_name is None:
             raise RuntimeError("CA Host is not set.")
 
-        f = open(certreq_fname, "r")
-        csr = f.readlines()
-        f.close()
-        csr = "".join(csr)
+        with open(certreq_fname, "r") as f:
+            csr = f.read()
 
         # We just want the CSR bits, make sure there is nothing else
         csr = pkcs10.strip_header(csr)
@@ -397,9 +395,9 @@ def issue_server_cert(self, certreq_fname, cert_fname):
                 'xmlOutput': 'true'}
 
         # Send the request to the CA
-        f = open(self.passwd_fname, "r")
-        password = f.readline()
-        f.close()
+        with open(self.passwd_fname, "r") as f:
+            password = f.readline()
+
         result = dogtag.https_request(
             self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient",
             self.secdir, password, "ipaCert", **params)
@@ -426,9 +424,8 @@ def issue_server_cert(self, certreq_fname, cert_fname):
 
         # Write the certificate to a file. It will be imported in a later
         # step. This file will be read later to be imported.
-        f = open(cert_fname, "w")
-        f.write(cert)
-        f.close()
+        with open(cert_fname, "w") as f:
+            f.write(cert)
 
     def issue_signing_cert(self, certreq_fname, cert_fname):
         self.setup_cert_request()
@@ -436,10 +433,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname):
         if self.host_name is None:
             raise RuntimeError("CA Host is not set.")
 
-        f = open(certreq_fname, "r")
-        csr = f.readlines()
-        f.close()
-        csr = "".join(csr)
+        with open(certreq_fname, "r") as f:
+            csr = f.read()
 
         # We just want the CSR bits, make sure there is no thing else
         csr = pkcs10.strip_header(csr)
@@ -451,9 +446,9 @@ def issue_signing_cert(self, certreq_fname, cert_fname):
                 'xmlOutput': 'true'}
 
         # Send the request to the CA
-        f = open(self.passwd_fname, "r")
-        password = f.readline()
-        f.close()
+        with open(self.passwd_fname, "r") as f:
+            password = f.readline()
+
         result = dogtag.https_request(
             self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient",
             self.secdir, password, "ipaCert", **params)
@@ -472,9 +467,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname):
 
         # Write the certificate to a file. It will be imported in a later
         # step. This file will be read later to be imported.
-        f = open(cert_fname, "w")
-        f.write(cert)
-        f.close()
+        with open(cert_fname, "w") as f:
+            f.write(cert)
 
     def add_cert(self, cert, nick, flags, pem=False):
         self.nssdb.add_cert(cert, nick, flags, pem)
@@ -497,13 +491,11 @@ def create_pin_file(self):
         This is the format of Directory Server pin files.
         """
         ipautil.backup_file(self.pin_fname)
-        f = open(self.pin_fname, "w")
-        f.write("Internal (Software) Token:")
-        pwdfile = open(self.passwd_fname)
-        f.write(pwdfile.read())
-        f.close()
-        pwdfile.close()
-        self.set_perms(self.pin_fname)
+        with open(self.pin_fname, "w") as pinfile:
+            self.set_perms(pinfile)
+            pinfile.write("Internal (Software) Token:")
+            with open(self.passwd_fname) as pwdfile:
+                pinfile.write(pwdfile.read())
 
     def find_root_cert(self, nickname):
         """
@@ -553,10 +545,9 @@ def create_from_cacert(self):
         if ipautil.file_exists(self.certdb_fname):
             # We already have a cert db, see if it is for the same CA.
             # If it is we leave things as they are.
-            f = open(cacert_fname, "r")
-            newca = f.readlines()
-            f.close()
-            newca = "".join(newca)
+            with open(cacert_fname, "r") as f:
+                newca = f.read()
+
             newca, _st = find_cert_from_txt(newca)
 
             cacert = self.get_cert_from_db(self.cacert_name)
-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to