All LUBackup* classes are extracted to backup.py.

Signed-off-by: Thomas Thrainer <[email protected]>
---
 Makefile.am            |   1 +
 lib/cmdlib/__init__.py | 496 +--------------------------------------------
 lib/cmdlib/backup.py   | 533 +++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 536 insertions(+), 494 deletions(-)
 create mode 100644 lib/cmdlib/backup.py

diff --git a/Makefile.am b/Makefile.am
index 8b26d97..b8346b7 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -316,6 +316,7 @@ cmdlib_PYTHON = \
        lib/cmdlib/node.py \
        lib/cmdlib/instance.py \
        lib/cmdlib/instance_utils.py \
+       lib/cmdlib/backup.py \
        lib/cmdlib/tags.py \
        lib/cmdlib/network.py \
        lib/cmdlib/test.py
diff --git a/lib/cmdlib/__init__.py b/lib/cmdlib/__init__.py
index 5689165..91e6864 100644
--- a/lib/cmdlib/__init__.py
+++ b/lib/cmdlib/__init__.py
@@ -30,14 +30,12 @@
 
 import time
 import logging
-import OpenSSL
 
 from ganeti import utils
 from ganeti import errors
 from ganeti import locking
 from ganeti import constants
 from ganeti import compat
-from ganeti import masterd
 from ganeti import query
 from ganeti import qlang
 
@@ -83,6 +81,8 @@ from ganeti.cmdlib.instance import LUInstanceCreate, 
LUInstanceRename, \
   LUInstanceReinstall, LUInstanceReboot, LUInstanceConsole, \
   LUInstanceFailover, LUInstanceMigrate, LUInstanceMultiAlloc, \
   LUInstanceSetParams, LUInstanceChangeGroup
+from ganeti.cmdlib.backup import _ExportQuery, LUBackupQuery, \
+  LUBackupPrepare, LUBackupExport, LUBackupRemove
 from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
 from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
   LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
@@ -629,498 +629,6 @@ class LUQueryFields(NoHooksLU):
     return query.QueryFields(self.qcls.FIELDS, self.op.fields)
 
 
-class LUBackupQuery(NoHooksLU):
-  """Query the exports list
-
-  """
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
-                             ["node", "export"], self.op.use_locking)
-
-  def ExpandNames(self):
-    self.expq.ExpandNames(self)
-
-  def DeclareLocks(self, level):
-    self.expq.DeclareLocks(self, level)
-
-  def Exec(self, feedback_fn):
-    result = {}
-
-    for (node, expname) in self.expq.OldStyleQuery(self):
-      if expname is None:
-        result[node] = False
-      else:
-        result.setdefault(node, []).append(expname)
-
-    return result
-
-
-class _ExportQuery(_QueryBase):
-  FIELDS = query.EXPORT_FIELDS
-
-  #: The node name is not a unique key for this query
-  SORT_FIELD = "node"
-
-  def ExpandNames(self, lu):
-    lu.needed_locks = {}
-
-    # The following variables interact with _QueryBase._GetNames
-    if self.names:
-      self.wanted = _GetWantedNodes(lu, self.names)
-    else:
-      self.wanted = locking.ALL_SET
-
-    self.do_locking = self.use_locking
-
-    if self.do_locking:
-      lu.share_locks = _ShareAll()
-      lu.needed_locks = {
-        locking.LEVEL_NODE: self.wanted,
-        }
-
-      if not self.names:
-        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
-  def DeclareLocks(self, lu, level):
-    pass
-
-  def _GetQueryData(self, lu):
-    """Computes the list of nodes and their attributes.
-
-    """
-    # Locking is not used
-    # TODO
-    assert not (compat.any(lu.glm.is_owned(level)
-                           for level in locking.LEVELS
-                           if level != locking.LEVEL_CLUSTER) or
-                self.do_locking or self.use_locking)
-
-    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
-
-    result = []
-
-    for (node, nres) in lu.rpc.call_export_list(nodes).items():
-      if nres.fail_msg:
-        result.append((node, None))
-      else:
-        result.extend((node, expname) for expname in nres.payload)
-
-    return result
-
-
-class LUBackupPrepare(NoHooksLU):
-  """Prepares an instance for an export and returns useful information.
-
-  """
-  REQ_BGL = False
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    """
-    instance_name = self.op.instance_name
-
-    self.instance = self.cfg.GetInstanceInfo(instance_name)
-    assert self.instance is not None, \
-          "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, self.instance.primary_node)
-
-    self._cds = _GetClusterDomainSecret()
-
-  def Exec(self, feedback_fn):
-    """Prepares an instance for an export.
-
-    """
-    instance = self.instance
-
-    if self.op.mode == constants.EXPORT_MODE_REMOTE:
-      salt = utils.GenerateSecret(8)
-
-      feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
-      result = self.rpc.call_x509_cert_create(instance.primary_node,
-                                              constants.RIE_CERT_VALIDITY)
-      result.Raise("Can't create X509 key and certificate on %s" % result.node)
-
-      (name, cert_pem) = result.payload
-
-      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
-                                             cert_pem)
-
-      return {
-        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
-        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
-                          salt),
-        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
-        }
-
-    return None
-
-
-class LUBackupExport(LogicalUnit):
-  """Export an instance to an image in the cluster.
-
-  """
-  HPATH = "instance-export"
-  HTYPE = constants.HTYPE_INSTANCE
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    """Check the arguments.
-
-    """
-    self.x509_key_name = self.op.x509_key_name
-    self.dest_x509_ca_pem = self.op.destination_x509_ca
-
-    if self.op.mode == constants.EXPORT_MODE_REMOTE:
-      if not self.x509_key_name:
-        raise errors.OpPrereqError("Missing X509 key name for encryption",
-                                   errors.ECODE_INVAL)
-
-      if not self.dest_x509_ca_pem:
-        raise errors.OpPrereqError("Missing destination X509 CA",
-                                   errors.ECODE_INVAL)
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-
-    # Lock all nodes for local exports
-    if self.op.mode == constants.EXPORT_MODE_LOCAL:
-      # FIXME: lock only instance primary and destination node
-      #
-      # Sad but true, for now we have do lock all nodes, as we don't know where
-      # the previous export might be, and in this LU we search for it and
-      # remove it from its current node. In the future we could fix this by:
-      #  - making a tasklet to search (share-lock all), then create the
-      #    new one, then one to remove, after
-      #  - removing the removal operation altogether
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
-
-      # Allocations should be stopped while this LU runs with node locks, but
-      # it doesn't have to be exclusive
-      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
-      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
-  def DeclareLocks(self, level):
-    """Last minute lock declaration."""
-    # All nodes are locked anyway, so nothing to do here.
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This will run on the master, primary node and target node.
-
-    """
-    env = {
-      "EXPORT_MODE": self.op.mode,
-      "EXPORT_NODE": self.op.target_node,
-      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
-      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
-      # TODO: Generic function for boolean env variables
-      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
-      }
-
-    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-
-    return env
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
-
-    if self.op.mode == constants.EXPORT_MODE_LOCAL:
-      nl.append(self.op.target_node)
-
-    return (nl, nl)
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance and node names are valid.
-
-    """
-    instance_name = self.op.instance_name
-
-    self.instance = self.cfg.GetInstanceInfo(instance_name)
-    assert self.instance is not None, \
-          "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, self.instance.primary_node)
-
-    if (self.op.remove_instance and
-        self.instance.admin_state == constants.ADMINST_UP and
-        not self.op.shutdown):
-      raise errors.OpPrereqError("Can not remove instance without shutting it"
-                                 " down before", errors.ECODE_STATE)
-
-    if self.op.mode == constants.EXPORT_MODE_LOCAL:
-      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
-      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
-      assert self.dst_node is not None
-
-      _CheckNodeOnline(self, self.dst_node.name)
-      _CheckNodeNotDrained(self, self.dst_node.name)
-
-      self._cds = None
-      self.dest_disk_info = None
-      self.dest_x509_ca = None
-
-    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
-      self.dst_node = None
-
-      if len(self.op.target_node) != len(self.instance.disks):
-        raise errors.OpPrereqError(("Received destination information for %s"
-                                    " disks, but instance %s has %s disks") %
-                                   (len(self.op.target_node), instance_name,
-                                    len(self.instance.disks)),
-                                   errors.ECODE_INVAL)
-
-      cds = _GetClusterDomainSecret()
-
-      # Check X509 key name
-      try:
-        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
-      except (TypeError, ValueError), err:
-        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
-                                   errors.ECODE_INVAL)
-
-      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
-        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
-                                   errors.ECODE_INVAL)
-
-      # Load and verify CA
-      try:
-        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
-      except OpenSSL.crypto.Error, err:
-        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
-                                   (err, ), errors.ECODE_INVAL)
-
-      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
-      if errcode is not None:
-        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
-                                   (msg, ), errors.ECODE_INVAL)
-
-      self.dest_x509_ca = cert
-
-      # Verify target information
-      disk_info = []
-      for idx, disk_data in enumerate(self.op.target_node):
-        try:
-          (host, port, magic) = \
-            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
-        except errors.GenericError, err:
-          raise errors.OpPrereqError("Target info for disk %s: %s" %
-                                     (idx, err), errors.ECODE_INVAL)
-
-        disk_info.append((host, port, magic))
-
-      assert len(disk_info) == len(self.op.target_node)
-      self.dest_disk_info = disk_info
-
-    else:
-      raise errors.ProgrammerError("Unhandled export mode %r" %
-                                   self.op.mode)
-
-    # instance disk type verification
-    # TODO: Implement export support for file-based disks
-    for disk in self.instance.disks:
-      if disk.dev_type == constants.LD_FILE:
-        raise errors.OpPrereqError("Export not supported for instances with"
-                                   " file-based disks", errors.ECODE_INVAL)
-
-  def _CleanupExports(self, feedback_fn):
-    """Removes exports of current instance from all other nodes.
-
-    If an instance in a cluster with nodes A..D was exported to node C, its
-    exports will be removed from the nodes A, B and D.
-
-    """
-    assert self.op.mode != constants.EXPORT_MODE_REMOTE
-
-    nodelist = self.cfg.GetNodeList()
-    nodelist.remove(self.dst_node.name)
-
-    # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpBackupQuery
-    # substitutes an empty list with the full cluster node list.
-    iname = self.instance.name
-    if nodelist:
-      feedback_fn("Removing old exports for instance %s" % iname)
-      exportlist = self.rpc.call_export_list(nodelist)
-      for node in exportlist:
-        if exportlist[node].fail_msg:
-          continue
-        if iname in exportlist[node].payload:
-          msg = self.rpc.call_export_remove(node, iname).fail_msg
-          if msg:
-            self.LogWarning("Could not remove older export for instance %s"
-                            " on node %s: %s", iname, node, msg)
-
-  def Exec(self, feedback_fn):
-    """Export an instance to an image in the cluster.
-
-    """
-    assert self.op.mode in constants.EXPORT_MODES
-
-    instance = self.instance
-    src_node = instance.primary_node
-
-    if self.op.shutdown:
-      # shutdown the instance, but not the disks
-      feedback_fn("Shutting down instance %s" % instance.name)
-      result = self.rpc.call_instance_shutdown(src_node, instance,
-                                               self.op.shutdown_timeout,
-                                               self.op.reason)
-      # TODO: Maybe ignore failures if ignore_remove_failures is set
-      result.Raise("Could not shutdown instance %s on"
-                   " node %s" % (instance.name, src_node))
-
-    # set the disks ID correctly since call_instance_start needs the
-    # correct drbd minor to create the symlinks
-    for disk in instance.disks:
-      self.cfg.SetDiskID(disk, src_node)
-
-    activate_disks = (instance.admin_state != constants.ADMINST_UP)
-
-    if activate_disks:
-      # Activate the instance disks if we'exporting a stopped instance
-      feedback_fn("Activating disks for %s" % instance.name)
-      _StartInstanceDisks(self, instance, None)
-
-    try:
-      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
-                                                     instance)
-
-      helper.CreateSnapshots()
-      try:
-        if (self.op.shutdown and
-            instance.admin_state == constants.ADMINST_UP and
-            not self.op.remove_instance):
-          assert not activate_disks
-          feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node,
-                                                (instance, None, None), False,
-                                                 self.op.reason)
-          msg = result.fail_msg
-          if msg:
-            feedback_fn("Failed to start instance: %s" % msg)
-            _ShutdownInstanceDisks(self, instance)
-            raise errors.OpExecError("Could not start instance: %s" % msg)
-
-        if self.op.mode == constants.EXPORT_MODE_LOCAL:
-          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
-        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
-          connect_timeout = constants.RIE_CONNECT_TIMEOUT
-          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
-
-          (key_name, _, _) = self.x509_key_name
-
-          dest_ca_pem = \
-            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
-                                            self.dest_x509_ca)
-
-          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
-                                                     key_name, dest_ca_pem,
-                                                     timeouts)
-      finally:
-        helper.Cleanup()
-
-      # Check for backwards compatibility
-      assert len(dresults) == len(instance.disks)
-      assert compat.all(isinstance(i, bool) for i in dresults), \
-             "Not all results are boolean: %r" % dresults
-
-    finally:
-      if activate_disks:
-        feedback_fn("Deactivating disks for %s" % instance.name)
-        _ShutdownInstanceDisks(self, instance)
-
-    if not (compat.all(dresults) and fin_resu):
-      failures = []
-      if not fin_resu:
-        failures.append("export finalization")
-      if not compat.all(dresults):
-        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
-                               if not dsk)
-        failures.append("disk export: disk(s) %s" % fdsk)
-
-      raise errors.OpExecError("Export failed, errors in %s" %
-                               utils.CommaJoin(failures))
-
-    # At this point, the export was successful, we can cleanup/finish
-
-    # Remove instance if requested
-    if self.op.remove_instance:
-      feedback_fn("Removing instance %s" % instance.name)
-      _RemoveInstance(self, feedback_fn, instance,
-                      self.op.ignore_remove_failures)
-
-    if self.op.mode == constants.EXPORT_MODE_LOCAL:
-      self._CleanupExports(feedback_fn)
-
-    return fin_resu, dresults
-
-
-class LUBackupRemove(NoHooksLU):
-  """Remove exports related to the named instance.
-
-  """
-  REQ_BGL = False
-
-  def ExpandNames(self):
-    self.needed_locks = {
-      # We need all nodes to be locked in order for RemoveExport to work, but
-      # we don't need to lock the instance itself, as nothing will happen to it
-      # (and we can remove exports also for a removed instance)
-      locking.LEVEL_NODE: locking.ALL_SET,
-
-      # Removing backups is quick, so blocking allocations is justified
-      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
-      }
-
-    # Allocations should be stopped while this LU runs with node locks, but it
-    # doesn't have to be exclusive
-    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
-
-  def Exec(self, feedback_fn):
-    """Remove any export.
-
-    """
-    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
-    # If the instance was not found we'll try with the name that was passed in.
-    # This will only work if it was an FQDN, though.
-    fqdn_warn = False
-    if not instance_name:
-      fqdn_warn = True
-      instance_name = self.op.instance_name
-
-    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
-    exportlist = self.rpc.call_export_list(locked_nodes)
-    found = False
-    for node in exportlist:
-      msg = exportlist[node].fail_msg
-      if msg:
-        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
-        continue
-      if instance_name in exportlist[node].payload:
-        found = True
-        result = self.rpc.call_export_remove(node, instance_name)
-        msg = result.fail_msg
-        if msg:
-          logging.error("Could not remove export for instance %s"
-                        " on node %s: %s", instance_name, node, msg)
-
-    if fqdn_warn and not found:
-      feedback_fn("Export not found. If trying to remove an export belonging"
-                  " to a deleted instance please use its Fully Qualified"
-                  " Domain Name.")
-
-
 class LURestrictedCommand(NoHooksLU):
   """Logical unit for executing restricted commands.
 
diff --git a/lib/cmdlib/backup.py b/lib/cmdlib/backup.py
new file mode 100644
index 0000000..3cbc664
--- /dev/null
+++ b/lib/cmdlib/backup.py
@@ -0,0 +1,533 @@
+#
+#
+
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Logical units dealing with backup operations."""
+
+import OpenSSL
+import logging
+
+from ganeti import compat
+from ganeti import constants
+from ganeti import errors
+from ganeti import locking
+from ganeti import masterd
+from ganeti import qlang
+from ganeti import query
+from ganeti import utils
+
+from ganeti.cmdlib.base import _QueryBase, NoHooksLU, LogicalUnit
+from ganeti.cmdlib.common import _GetWantedNodes, _ShareAll, \
+  _CheckNodeOnline, _ExpandNodeName
+from ganeti.cmdlib.instance_utils import _GetClusterDomainSecret, \
+  _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _StartInstanceDisks, \
+  _ShutdownInstanceDisks, _RemoveInstance
+
+
+class _ExportQuery(_QueryBase):
+  FIELDS = query.EXPORT_FIELDS
+
+  #: The node name is not a unique key for this query
+  SORT_FIELD = "node"
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+
+    # The following variables interact with _QueryBase._GetNames
+    if self.names:
+      self.wanted = _GetWantedNodes(lu, self.names)
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = self.use_locking
+
+    if self.do_locking:
+      lu.share_locks = _ShareAll()
+      lu.needed_locks = {
+        locking.LEVEL_NODE: self.wanted,
+        }
+
+      if not self.names:
+        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of nodes and their attributes.
+
+    """
+    # Locking is not used
+    # TODO
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
+
+    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
+
+    result = []
+
+    for (node, nres) in lu.rpc.call_export_list(nodes).items():
+      if nres.fail_msg:
+        result.append((node, None))
+      else:
+        result.extend((node, expname) for expname in nres.payload)
+
+    return result
+
+
+class LUBackupQuery(NoHooksLU):
+  """Query the exports list
+
+  """
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
+                             ["node", "export"], self.op.use_locking)
+
+  def ExpandNames(self):
+    self.expq.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.expq.DeclareLocks(self, level)
+
+  def Exec(self, feedback_fn):
+    result = {}
+
+    for (node, expname) in self.expq.OldStyleQuery(self):
+      if expname is None:
+        result[node] = False
+      else:
+        result.setdefault(node, []).append(expname)
+
+    return result
+
+
+class LUBackupPrepare(NoHooksLU):
+  """Prepares an instance for an export and returns useful information.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    instance_name = self.op.instance_name
+
+    self.instance = self.cfg.GetInstanceInfo(instance_name)
+    assert self.instance is not None, \
+          "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
+
+    self._cds = _GetClusterDomainSecret()
+
+  def Exec(self, feedback_fn):
+    """Prepares an instance for an export.
+
+    """
+    instance = self.instance
+
+    if self.op.mode == constants.EXPORT_MODE_REMOTE:
+      salt = utils.GenerateSecret(8)
+
+      feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
+      result = self.rpc.call_x509_cert_create(instance.primary_node,
+                                              constants.RIE_CERT_VALIDITY)
+      result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+      (name, cert_pem) = result.payload
+
+      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                             cert_pem)
+
+      return {
+        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
+        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
+                          salt),
+        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
+        }
+
+    return None
+
+
+class LUBackupExport(LogicalUnit):
+  """Export an instance to an image in the cluster.
+
+  """
+  HPATH = "instance-export"
+  HTYPE = constants.HTYPE_INSTANCE
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    """Check the arguments.
+
+    """
+    self.x509_key_name = self.op.x509_key_name
+    self.dest_x509_ca_pem = self.op.destination_x509_ca
+
+    if self.op.mode == constants.EXPORT_MODE_REMOTE:
+      if not self.x509_key_name:
+        raise errors.OpPrereqError("Missing X509 key name for encryption",
+                                   errors.ECODE_INVAL)
+
+      if not self.dest_x509_ca_pem:
+        raise errors.OpPrereqError("Missing destination X509 CA",
+                                   errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+    # Lock all nodes for local exports
+    if self.op.mode == constants.EXPORT_MODE_LOCAL:
+      # FIXME: lock only instance primary and destination node
+      #
+      # Sad but true, for now we have do lock all nodes, as we don't know where
+      # the previous export might be, and in this LU we search for it and
+      # remove it from its current node. In the future we could fix this by:
+      #  - making a tasklet to search (share-lock all), then create the
+      #    new one, then one to remove, after
+      #  - removing the removal operation altogether
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+      # Allocations should be stopped while this LU runs with node locks, but
+      # it doesn't have to be exclusive
+      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
+      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+  def DeclareLocks(self, level):
+    """Last minute lock declaration."""
+    # All nodes are locked anyway, so nothing to do here.
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This will run on the master, primary node and target node.
+
+    """
+    env = {
+      "EXPORT_MODE": self.op.mode,
+      "EXPORT_NODE": self.op.target_node,
+      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
+      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
+      # TODO: Generic function for boolean env variables
+      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
+      }
+
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
+
+    if self.op.mode == constants.EXPORT_MODE_LOCAL:
+      nl.append(self.op.target_node)
+
+    return (nl, nl)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance and node names are valid.
+
+    """
+    instance_name = self.op.instance_name
+
+    self.instance = self.cfg.GetInstanceInfo(instance_name)
+    assert self.instance is not None, \
+          "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
+
+    if (self.op.remove_instance and
+        self.instance.admin_state == constants.ADMINST_UP and
+        not self.op.shutdown):
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before", errors.ECODE_STATE)
+
+    if self.op.mode == constants.EXPORT_MODE_LOCAL:
+      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+      assert self.dst_node is not None
+
+      _CheckNodeOnline(self, self.dst_node.name)
+      _CheckNodeNotDrained(self, self.dst_node.name)
+
+      self._cds = None
+      self.dest_disk_info = None
+      self.dest_x509_ca = None
+
+    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
+      self.dst_node = None
+
+      if len(self.op.target_node) != len(self.instance.disks):
+        raise errors.OpPrereqError(("Received destination information for %s"
+                                    " disks, but instance %s has %s disks") %
+                                   (len(self.op.target_node), instance_name,
+                                    len(self.instance.disks)),
+                                   errors.ECODE_INVAL)
+
+      cds = _GetClusterDomainSecret()
+
+      # Check X509 key name
+      try:
+        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
+                                   errors.ECODE_INVAL)
+
+      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
+        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
+                                   errors.ECODE_INVAL)
+
+      # Load and verify CA
+      try:
+        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
+      except OpenSSL.crypto.Error, err:
+        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
+                                   (err, ), errors.ECODE_INVAL)
+
+      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+      if errcode is not None:
+        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
+                                   (msg, ), errors.ECODE_INVAL)
+
+      self.dest_x509_ca = cert
+
+      # Verify target information
+      disk_info = []
+      for idx, disk_data in enumerate(self.op.target_node):
+        try:
+          (host, port, magic) = \
+            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
+        except errors.GenericError, err:
+          raise errors.OpPrereqError("Target info for disk %s: %s" %
+                                     (idx, err), errors.ECODE_INVAL)
+
+        disk_info.append((host, port, magic))
+
+      assert len(disk_info) == len(self.op.target_node)
+      self.dest_disk_info = disk_info
+
+    else:
+      raise errors.ProgrammerError("Unhandled export mode %r" %
+                                   self.op.mode)
+
+    # instance disk type verification
+    # TODO: Implement export support for file-based disks
+    for disk in self.instance.disks:
+      if disk.dev_type == constants.LD_FILE:
+        raise errors.OpPrereqError("Export not supported for instances with"
+                                   " file-based disks", errors.ECODE_INVAL)
+
+  def _CleanupExports(self, feedback_fn):
+    """Removes exports of current instance from all other nodes.
+
+    If an instance in a cluster with nodes A..D was exported to node C, its
+    exports will be removed from the nodes A, B and D.
+
+    """
+    assert self.op.mode != constants.EXPORT_MODE_REMOTE
+
+    nodelist = self.cfg.GetNodeList()
+    nodelist.remove(self.dst_node.name)
+
+    # on one-node clusters nodelist will be empty after the removal
+    # if we proceed the backup would be removed because OpBackupQuery
+    # substitutes an empty list with the full cluster node list.
+    iname = self.instance.name
+    if nodelist:
+      feedback_fn("Removing old exports for instance %s" % iname)
+      exportlist = self.rpc.call_export_list(nodelist)
+      for node in exportlist:
+        if exportlist[node].fail_msg:
+          continue
+        if iname in exportlist[node].payload:
+          msg = self.rpc.call_export_remove(node, iname).fail_msg
+          if msg:
+            self.LogWarning("Could not remove older export for instance %s"
+                            " on node %s: %s", iname, node, msg)
+
+  def Exec(self, feedback_fn):
+    """Export an instance to an image in the cluster.
+
+    """
+    assert self.op.mode in constants.EXPORT_MODES
+
+    instance = self.instance
+    src_node = instance.primary_node
+
+    if self.op.shutdown:
+      # shutdown the instance, but not the disks
+      feedback_fn("Shutting down instance %s" % instance.name)
+      result = self.rpc.call_instance_shutdown(src_node, instance,
+                                               self.op.shutdown_timeout,
+                                               self.op.reason)
+      # TODO: Maybe ignore failures if ignore_remove_failures is set
+      result.Raise("Could not shutdown instance %s on"
+                   " node %s" % (instance.name, src_node))
+
+    # set the disks ID correctly since call_instance_start needs the
+    # correct drbd minor to create the symlinks
+    for disk in instance.disks:
+      self.cfg.SetDiskID(disk, src_node)
+
+    activate_disks = (instance.admin_state != constants.ADMINST_UP)
+
+    if activate_disks:
+      # Activate the instance disks if we'exporting a stopped instance
+      feedback_fn("Activating disks for %s" % instance.name)
+      _StartInstanceDisks(self, instance, None)
+
+    try:
+      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
+                                                     instance)
+
+      helper.CreateSnapshots()
+      try:
+        if (self.op.shutdown and
+            instance.admin_state == constants.ADMINST_UP and
+            not self.op.remove_instance):
+          assert not activate_disks
+          feedback_fn("Starting instance %s" % instance.name)
+          result = self.rpc.call_instance_start(src_node,
+                                                (instance, None, None), False,
+                                                 self.op.reason)
+          msg = result.fail_msg
+          if msg:
+            feedback_fn("Failed to start instance: %s" % msg)
+            _ShutdownInstanceDisks(self, instance)
+            raise errors.OpExecError("Could not start instance: %s" % msg)
+
+        if self.op.mode == constants.EXPORT_MODE_LOCAL:
+          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
+          connect_timeout = constants.RIE_CONNECT_TIMEOUT
+          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+          (key_name, _, _) = self.x509_key_name
+
+          dest_ca_pem = \
+            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                            self.dest_x509_ca)
+
+          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
+                                                     key_name, dest_ca_pem,
+                                                     timeouts)
+      finally:
+        helper.Cleanup()
+
+      # Check for backwards compatibility
+      assert len(dresults) == len(instance.disks)
+      assert compat.all(isinstance(i, bool) for i in dresults), \
+             "Not all results are boolean: %r" % dresults
+
+    finally:
+      if activate_disks:
+        feedback_fn("Deactivating disks for %s" % instance.name)
+        _ShutdownInstanceDisks(self, instance)
+
+    if not (compat.all(dresults) and fin_resu):
+      failures = []
+      if not fin_resu:
+        failures.append("export finalization")
+      if not compat.all(dresults):
+        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
+                               if not dsk)
+        failures.append("disk export: disk(s) %s" % fdsk)
+
+      raise errors.OpExecError("Export failed, errors in %s" %
+                               utils.CommaJoin(failures))
+
+    # At this point, the export was successful, we can cleanup/finish
+
+    # Remove instance if requested
+    if self.op.remove_instance:
+      feedback_fn("Removing instance %s" % instance.name)
+      _RemoveInstance(self, feedback_fn, instance,
+                      self.op.ignore_remove_failures)
+
+    if self.op.mode == constants.EXPORT_MODE_LOCAL:
+      self._CleanupExports(feedback_fn)
+
+    return fin_resu, dresults
+
+
+class LUBackupRemove(NoHooksLU):
+  """Remove exports related to the named instance.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      # We need all nodes to be locked in order for RemoveExport to work, but
+      # we don't need to lock the instance itself, as nothing will happen to it
+      # (and we can remove exports also for a removed instance)
+      locking.LEVEL_NODE: locking.ALL_SET,
+
+      # Removing backups is quick, so blocking allocations is justified
+      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+      }
+
+    # Allocations should be stopped while this LU runs with node locks, but it
+    # doesn't have to be exclusive
+    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
+
+  def Exec(self, feedback_fn):
+    """Remove any export.
+
+    """
+    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
+    # If the instance was not found we'll try with the name that was passed in.
+    # This will only work if it was an FQDN, though.
+    fqdn_warn = False
+    if not instance_name:
+      fqdn_warn = True
+      instance_name = self.op.instance_name
+
+    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
+    exportlist = self.rpc.call_export_list(locked_nodes)
+    found = False
+    for node in exportlist:
+      msg = exportlist[node].fail_msg
+      if msg:
+        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
+        continue
+      if instance_name in exportlist[node].payload:
+        found = True
+        result = self.rpc.call_export_remove(node, instance_name)
+        msg = result.fail_msg
+        if msg:
+          logging.error("Could not remove export for instance %s"
+                        " on node %s: %s", instance_name, node, msg)
+
+    if fqdn_warn and not found:
+      feedback_fn("Export not found. If trying to remove an export belonging"
+                  " to a deleted instance please use its Fully Qualified"
+                  " Domain Name.")
-- 
1.8.2.1

Reply via email to