This adds a unit test which tests a successful run of
LURenewCrypto. While writing this, some options for
improvement became apparent and are fixed in this patch
as well.

Signed-off-by: Helga Velroyen <[email protected]>
---
 lib/cmdlib/cluster.py              | 16 +++++-----
 test/py/cmdlib/cluster_unittest.py | 62 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 71 insertions(+), 7 deletions(-)

diff --git a/lib/cmdlib/cluster.py b/lib/cmdlib/cluster.py
index 24972fa..7959ff0 100644
--- a/lib/cmdlib/cluster.py
+++ b/lib/cmdlib/cluster.py
@@ -1,8 +1,8 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
-# All rights reserved.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015
+# Google Inc. All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
 # modification, are permitted provided that the following conditions are
@@ -129,12 +129,14 @@ class LUClusterRenewCrypto(NoHooksLU):
     except IOError:
       logging.info("No old certificate available.")
 
-    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
-                                                feedback_fn)
+    # Technically it should not be necessary to set the cert
+    # paths. However, due to a bug in the mock library, we
+    # have to do this to be able to test the function properly.
+    _UpdateMasterClientCert(
+        self, master_uuid, cluster, feedback_fn,
+        client_cert=pathutils.NODED_CLIENT_CERT_FILE,
+        client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP)
 
-    utils.AddNodeToCandidateCerts(master_uuid,
-                                  new_master_digest,
-                                  cluster.candidate_certs)
     nodes = self.cfg.GetAllNodesInfo()
     for (node_uuid, node_info) in nodes.items():
       if node_info.offline:
diff --git a/test/py/cmdlib/cluster_unittest.py 
b/test/py/cmdlib/cluster_unittest.py
index dc3a946..e3ea305 100644
--- a/test/py/cmdlib/cluster_unittest.py
+++ b/test/py/cmdlib/cluster_unittest.py
@@ -37,6 +37,8 @@ import OpenSSL
 import copy
 import unittest
 import operator
+import shutil
+import os
 
 from ganeti.cmdlib import cluster
 from ganeti import constants
@@ -2264,5 +2266,65 @@ class TestLUClusterVerifyDisks(CmdlibTestCase):
     self.assertEqual(1, len(result["jobs"]))
 
 
+class TestLUClusterRenewCrypto(CmdlibTestCase):
+
+  def setUp(self):
+    super(TestLUClusterRenewCrypto, self).setUp()
+    self._node_cert = self._CreateTempFile()
+    shutil.copy(testutils.TestDataFilename("cert1.pem"), self._node_cert)
+    self._client_node_cert = self._CreateTempFile()
+    shutil.copy(testutils.TestDataFilename("cert2.pem"), 
self._client_node_cert)
+    self._client_node_cert_tmp = self._CreateTempFile()
+
+  def tearDown(self):
+    super(TestLUClusterRenewCrypto, self).tearDown()
+
+  def _GetFakeDigest(self, uuid):
+    """Creates a fake SSL digest depending on the UUID of a node.
+
+    @type uuid: string
+    @param uuid: node UUID
+    @returns: a string impersonating a SSL digest
+
+    """
+    return "FA:KE:%s:%s:%s:%s" % (uuid[0:2], uuid[2:4], uuid[4:6], uuid[6:8])
+ 
+  @patchPathutils("cluster")
+  def testSuccessfulCase(self, pathutils):
+
+    # patch pathutils to point to temporary files
+    pathutils.NODED_CERT_FILE = self._node_cert 
+    pathutils.NODED_CLIENT_CERT_FILE = self._client_node_cert 
+    pathutils.NODED_CLIENT_CERT_FILE_TMP = \
+        self._client_node_cert_tmp
+
+    # create a few non-master, online nodes
+    num_nodes = 3
+    for _ in range(num_nodes):
+      self.cfg.AddNewNode()
+
+    # make sure the RPC calls are successful for all nodes
+    self.rpc.call_node_crypto_tokens = \
+      lambda node_uuid, _: self.RpcResultsBuilder() \
+        .CreateSuccessfulNodeResult(node_uuid,
+          [(constants.CRYPTO_TYPE_SSL_DIGEST, self._GetFakeDigest(node_uuid))])
+
+    op = opcodes.OpClusterRenewCrypto()
+    self.ExecOpCode(op)
+
+    # Check if the correct certificates exist and don't exist on the master
+    self.assertTrue(os.path.exists(pathutils.NODED_CERT_FILE))
+    self.assertTrue(os.path.exists(pathutils.NODED_CLIENT_CERT_FILE))
+    self.assertFalse(os.path.exists(pathutils.NODED_CLIENT_CERT_FILE_TMP))
+
+    # Check if we have the correct digests in the configuration
+    cluster = self.cfg.GetClusterInfo()
+    self.assertEqual(num_nodes + 1, len(cluster.candidate_certs))
+    nodes = self.cfg.GetAllNodesInfo()
+    for (node_uuid, _) in nodes.items():
+      expected_digest = self._GetFakeDigest(node_uuid)
+      self.assertEqual(expected_digest, cluster.candidate_certs[node_uuid])
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
-- 
2.2.0.rc0.207.ga3a616c

Reply via email to