So far, all our SSL certficates were self-signed. As from this patch series on client certificates will be signed by the cluster certificate, we need a utility function for creation of not self-signed certificates.
Signed-off-by: Helga Velroyen <[email protected]> --- lib/utils/x509.py | 64 +++++++++++++++++++++++++++++++++++ test/py/ganeti.utils.x509_unittest.py | 36 +++++++++++++++++++- 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/lib/utils/x509.py b/lib/utils/x509.py index 6ab62c4..63ded07 100644 --- a/lib/utils/x509.py +++ b/lib/utils/x509.py @@ -54,6 +54,7 @@ X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" % (re.escape(constants.X509_CERT_SIGNATURE_HEADER), HEX_CHAR_RE, HEX_CHAR_RE), re.S | re.I) +X509_CERT_SIGN_DIGEST = "SHA1" # Certificate verification results (CERT_WARNING, @@ -326,6 +327,69 @@ def GenerateSelfSignedSslCert(filename, serial_no, return (key_pem, cert_pem) +def GenerateSignedX509Cert(common_name, validity, serial_no, + signing_cert_pem): + """Generates a signed (but not self-signed) X509 certificate. + + @type common_name: string + @param common_name: commonName value, should be hostname of the machine + @type validity: int + @param validity: Validity for certificate in seconds + @type signing_cert_pem: X509 key + @param signing_cert_pem: PEM-encoded private key of the signing certificate + @return: a tuple of strings containing the PEM-encoded private key and + certificate + + """ + # Create key pair with private and public key. + key_pair = OpenSSL.crypto.PKey() + key_pair.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS) + + # Create certificate sigining request. + req = OpenSSL.crypto.X509Req() + req.get_subject().CN = common_name + req.set_pubkey(key_pair) + req.sign(key_pair, X509_CERT_SIGN_DIGEST) + + # Load the certificates used for signing. + signing_key = OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, signing_cert_pem) + signing_cert = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, signing_cert_pem) + + # Create a certificate and sign it. + cert = OpenSSL.crypto.X509() + cert.set_subject(req.get_subject()) + cert.set_serial_number(serial_no) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(validity) + cert.set_issuer(signing_cert.get_subject()) + cert.set_pubkey(req.get_pubkey()) + cert.sign(signing_key, X509_CERT_SIGN_DIGEST) + + # Encode the key and certificate in PEM format. + key_pem = OpenSSL.crypto.dump_privatekey( + OpenSSL.crypto.FILETYPE_PEM, key_pair) + cert_pem = OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_PEM, cert) + + return (key_pem, cert_pem) + + +def GenerateSignedSslCert(filename_cert, serial_no, + filename_signing_cert, + common_name=constants.X509_CERT_CN, + validity=constants.X509_CERT_DEFAULT_VALIDITY, + uid=-1, gid=-1): + signing_cert_pem = utils_io.ReadFile(filename_signing_cert) + (key_pem, cert_pem) = GenerateSignedX509Cert( + common_name, validity * 24 * 60 * 60, serial_no, signing_cert_pem) + + utils_io.WriteFile(filename_cert, mode=0440, data=key_pem + cert_pem, + uid=uid, gid=gid) + return (key_pem, cert_pem) + + def ExtractX509Certificate(pem): """Extracts the certificate from a PEM-formatted string. diff --git a/test/py/ganeti.utils.x509_unittest.py b/test/py/ganeti.utils.x509_unittest.py index 99ecd76..01ad894 100755 --- a/test/py/ganeti.utils.x509_unittest.py +++ b/test/py/ganeti.utils.x509_unittest.py @@ -245,7 +245,7 @@ class TestVerifyCertificateInner(unittest.TestCase): self.assertEqual(errcode, utils.CERT_ERROR) -class TestGenerateSelfSignedX509Cert(unittest.TestCase): +class TestGenerateX509Certs(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() @@ -294,6 +294,40 @@ class TestGenerateSelfSignedX509Cert(unittest.TestCase): self.assert_(self._checkRsaPrivateKey(cert1)) self.assert_(self._checkCertificate(cert1)) + def _checkKeyMatchesCert(self, key, cert): + ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD) + ctx.use_privatekey(key) + ctx.use_certificate(cert) + try: + ctx.check_privatekey() + except OpenSSL.SSL.Error: + return False + else: + return True + + def testSignedSslCertificate(self): + server_cert_filename = os.path.join(self.tmpdir, "server.pem") + utils.GenerateSelfSignedSslCert(server_cert_filename, 123456) + + client_hostname = "myhost.example.com" + client_cert_filename = os.path.join(self.tmpdir, "client.pem") + utils.GenerateSignedSslCert(client_cert_filename, 666, + server_cert_filename, common_name=client_hostname) + + client_cert_pem = utils.ReadFile(client_cert_filename) + + self._checkRsaPrivateKey(client_cert_pem) + self._checkCertificate(client_cert_pem) + + priv_key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, + client_cert_pem) + client_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + client_cert_pem) + + self.assertTrue(self._checkKeyMatchesCert(priv_key, client_cert)) + self.assertEqual(client_cert.get_issuer().CN, "ganeti.example.com") + self.assertEqual(client_cert.get_subject().CN, client_hostname) + class TestCheckNodeCertificate(testutils.GanetiTestCase): def setUp(self): -- 2.4.3.573.g4eafbef
