This is a first version of the instance move tool and it supports moving
1..N instances from one cluster to another. When moving a single instance,
the instance can be renamed, allowing for moves within the same cluster
(not really useful in practice, but good for testing). Documentation is
updated to describe this new tool and its usage.

The “move-instance” tool uses the workerpool to support parallel moves of
instances. Supporting them was simple as threads were required anyway due
to the synchronous RAPI client.

Signed-off-by: Michael Hanselmann <[email protected]>
---
 Makefile.am           |    2 +
 doc/index.rst         |    1 +
 doc/move-instance.rst |   97 ++++++
 tools/move-instance   |  867 +++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 967 insertions(+), 0 deletions(-)
 create mode 100644 doc/move-instance.rst
 create mode 100755 tools/move-instance

diff --git a/Makefile.am b/Makefile.am
index 04101d4..4dfce04 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -162,6 +162,7 @@ docrst = \
        doc/install-quick.rst \
        doc/install.rst \
        doc/locking.rst \
+       doc/move-instance.rst \
        doc/news.rst \
        doc/rapi.rst \
        doc/security.rst \
@@ -242,6 +243,7 @@ dist_tools_SCRIPTS = \
        tools/cfgupgrade12 \
        tools/cluster-merge \
        tools/lvmstrap \
+       tools/move-instance \
        tools/sanitize-config
 
 pkglib_python_scripts = \
diff --git a/doc/index.rst b/doc/index.rst
index 15b5054..be64523 100644
--- a/doc/index.rst
+++ b/doc/index.rst
@@ -22,6 +22,7 @@ Contents:
    hooks.rst
    iallocator.rst
    rapi.rst
+   move-instance.rst
    devnotes.rst
    news.rst
    glossary.rst
diff --git a/doc/move-instance.rst b/doc/move-instance.rst
new file mode 100644
index 0000000..14e5c35
--- /dev/null
+++ b/doc/move-instance.rst
@@ -0,0 +1,97 @@
+=================================
+Moving instances between clusters
+=================================
+
+Starting with Ganeti 2.2, instances can be moved between separate Ganeti
+clusters using a new tool, ``move-instance``. The tool has a number of
+features:
+
+- Moving a single or multiple instances
+- Moving instances in parallel (``--parallel`` option)
+- Renaming instance (only when moving a single instance)
+- SSL certificate verification for RAPI connections
+
+The design of the inter-cluster instances moves is described in detail
+in the :doc:`Ganeti 2.2 design document <design-2.2>`. The instance move
+tool talks to the Ganeti clusters via RAPI and can run on any machine
+which can connect to the cluster's RAPI. Despite their similar name, the
+instance move tool should not be confused with the ``gnt-instance move``
+command, which is used to move an instance within the cluster.
+
+
+Configuring clusters for instance moves
+---------------------------------------
+
+To prevent third parties from accessing the instance data, all data
+exchanged between the clusters is signed using a secret key, the
+"cluster domain secret". It is recommended to assign the same domain
+secret to all clusters of the same type, so that instances can be easily
+moved between them. By checking the signatures, the destination cluster
+can be sure the third party (e.g. this tool) didn't modify the received
+crypto keys and connection information.
+
+.. highlight:: sh
+
+To create a new, random cluster domain secret, run the following command
+on the master node::
+
+  gnt-cluster renew-crypto --new-cluster-domain-secret
+
+
+To set the cluster domain secret, run the following command on the
+master node::
+
+  gnt-cluster renew-crypto --cluster-domain-secret=/.../ganeti.cds
+
+
+Moving instances
+----------------
+
+As soon as the clusters share a cluster domain secret, instances can be
+moved. The tool usage is as follows::
+
+  move-instance [options] <source-cluster> <destination-cluster> 
<instance-name...>
+
+Multiple instances can be moved with one invocation of the instance move
+tool, though a few options are only available when moving a single
+instance.
+
+The most important options are listed below. Unless specified otherwise,
+destination-related options default to the source value (e.g. setting
+``--src-rapi-port=1234`` will make ``--dest-rapi-port``'s default 1234).
+
+``--src-rapi-port``/``--dest-rapi-port``
+  RAPI server TCP port, defaults to 5080.
+``--src-ca-file``/``--dest-ca-file``
+  Path to file containing source cluster Certificate Authority (CA) in
+  PEM format. For self-signed certificates, this is the certificate
+  itself. For certificates signed by a third party CA, the complete
+  chain must be in the file (see documentation for
+  ``SSL_CTX_load_verify_locations(3)``).
+``--src-username``/``--dest-username``
+  RAPI username, must have write access to cluster.
+``--src-password-file``/``--dest-password-file``
+  Path to file containing RAPI password (make sure to restrict access to
+  this file).
+``--dest-instance-name``
+  When moving a single instance: Change name of instance on destination
+  cluster.
+``--dest-primary-node``
+  When moving a single instance: Primary node on destination cluster.
+``--dest-secondary-node``
+  When moving a single instance: Secondary node on destination cluster.
+``--iallocator``
+  Iallocator for creating instance on destination cluster.
+``--parallel``
+  Number of instance moves to run in parallel.
+``--verbose``/``--debug``
+  Increase output verbosity.
+
+The exit value of the tool is zero if and only if all instance moves
+were successful.
+
+.. vim: set textwidth=72 :
+.. Local Variables:
+.. mode: rst
+.. fill-column: 72
+.. End:
diff --git a/tools/move-instance b/tools/move-instance
new file mode 100755
index 0000000..93ff153
--- /dev/null
+++ b/tools/move-instance
@@ -0,0 +1,867 @@
+#!/usr/bin/python
+#
+
+# Copyright (C) 2010 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+"""Tool to move instances from one cluster to another.
+
+"""
+
+# pylint: disable-msg=C0103
+# C0103: Invalid name move-instance
+
+import os
+import sys
+import time
+import logging
+import optparse
+import threading
+
+from ganeti import cli
+from ganeti import constants
+from ganeti import utils
+from ganeti import workerpool
+from ganeti import compat
+from ganeti import rapi
+
+import ganeti.rapi.client # pylint: disable-msg=W0611
+import ganeti.rapi.client_utils
+
+
+SRC_RAPI_PORT_OPT = \
+  cli.cli_option("--src-rapi-port", action="store", type="int",
+                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
+                 help=("Source cluster RAPI port (defaults to %s)" %
+                       constants.DEFAULT_RAPI_PORT))
+
+SRC_CA_FILE_OPT = \
+  cli.cli_option("--src-ca-file", action="store", type="string",
+                 dest="src_ca_file",
+                 help=("File containing source cluster Certificate"
+                       " Authority (CA) in PEM format"))
+
+SRC_USERNAME_OPT = \
+  cli.cli_option("--src-username", action="store", type="string",
+                 dest="src_username", default=None,
+                 help="Source cluster username")
+
+SRC_PASSWORD_FILE_OPT = \
+  cli.cli_option("--src-password-file", action="store", type="string",
+                 dest="src_password_file",
+                 help="File containing source cluster password")
+
+DEST_RAPI_PORT_OPT = \
+  cli.cli_option("--dest-rapi-port", action="store", type="int",
+                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
+                 help=("Destination cluster RAPI port (defaults to source"
+                       " cluster RAPI port)"))
+
+DEST_CA_FILE_OPT = \
+  cli.cli_option("--dest-ca-file", action="store", type="string",
+                 dest="dest_ca_file",
+                 help=("File containing destination cluster Certificate"
+                       " Authority (CA) in PEM format (defaults to source"
+                       " cluster CA)"))
+
+DEST_USERNAME_OPT = \
+  cli.cli_option("--dest-username", action="store", type="string",
+                 dest="dest_username", default=None,
+                 help=("Destination cluster username (defaults to"
+                       " source cluster username)"))
+
+DEST_PASSWORD_FILE_OPT = \
+  cli.cli_option("--dest-password-file", action="store", type="string",
+                 dest="dest_password_file",
+                 help=("File containing destination cluster password"
+                       " (defaults to source cluster password)"))
+
+DEST_INSTANCE_NAME_OPT = \
+  cli.cli_option("--dest-instance-name", action="store", type="string",
+                 dest="dest_instance_name",
+                 help=("Instance name on destination cluster (only"
+                       " when moving exactly one instance)"))
+
+DEST_PRIMARY_NODE_OPT = \
+  cli.cli_option("--dest-primary-node", action="store", type="string",
+                 dest="dest_primary_node",
+                 help=("Primary node on destination cluster (only"
+                       " when moving exactly one instance)"))
+
+DEST_SECONDARY_NODE_OPT = \
+  cli.cli_option("--dest-secondary-node", action="store", type="string",
+                 dest="dest_secondary_node",
+                 help=("Secondary node on destination cluster (only"
+                       " when moving exactly one instance)"))
+
+PARALLEL_OPT = \
+  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
+                 dest="parallel", metavar="<number>",
+                 help="Number of instances to be moved simultaneously")
+
+
+class Error(Exception):
+  """Generic error.
+
+  """
+
+
+class Abort(Error):
+  """Special exception for aborting import/export.
+
+  """
+
+
+class RapiClientFactory:
+  """Factory class for creating RAPI clients.
+
+  @ivar src_cluster_name: Source cluster name
+  @ivar dest_cluster_name: Destination cluster name
+  @ivar GetSourceClient: Callable returning new client for source cluster
+  @ivar GetDestClient: Callable returning new client for destination cluster
+
+  """
+  def __init__(self, options, src_cluster_name, dest_cluster_name):
+    """Initializes this class.
+
+    @param options: Program options
+    @type src_cluster_name: string
+    @param src_cluster_name: Source cluster name
+    @type dest_cluster_name: string
+    @param dest_cluster_name: Destination cluster name
+
+    """
+    self.src_cluster_name = src_cluster_name
+    self.dest_cluster_name = dest_cluster_name
+
+    # TODO: Support for using system default paths for verifying SSL 
certificate
+    # (already implemented in CertAuthorityVerify)
+    logging.debug("Using '%s' as source CA", options.src_ca_file)
+    src_ssl_config = 
rapi.client.CertAuthorityVerify(cafile=options.src_ca_file)
+
+    if options.dest_ca_file:
+      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
+      dest_ssl_config = \
+        rapi.client.CertAuthorityVerify(cafile=options.dest_ca_file)
+    else:
+      logging.debug("Using source CA for destination")
+      dest_ssl_config = src_ssl_config
+
+    logging.debug("Source RAPI server is %s:%s",
+                  src_cluster_name, options.src_rapi_port)
+    logging.debug("Source username is '%s'", options.src_username)
+
+    if options.src_username is None:
+      src_username = ""
+    else:
+      src_username = options.src_username
+
+    if options.src_password_file:
+      logging.debug("Reading '%s' for source password",
+                    options.src_password_file)
+      src_password = utils.ReadOneLineFile(options.src_password_file,
+                                           strict=True)
+    else:
+      logging.debug("Source has no password")
+      src_password = None
+
+    self.GetSourceClient = lambda: \
+      rapi.client.GanetiRapiClient(src_cluster_name,
+                                   port=options.src_rapi_port,
+                                   config_ssl_verification=src_ssl_config,
+                                   username=src_username,
+                                   password=src_password)
+
+    if options.dest_rapi_port:
+      dest_rapi_port = options.dest_rapi_port
+    else:
+      dest_rapi_port = options.src_rapi_port
+
+    if options.dest_username is None:
+      dest_username = src_username
+    else:
+      dest_username = options.dest_username
+
+    logging.debug("Destination RAPI server is %s:%s",
+                  dest_cluster_name, dest_rapi_port)
+    logging.debug("Destination username is '%s'", dest_username)
+
+    if options.dest_password_file:
+      logging.debug("Reading '%s' for destination password",
+                    options.dest_password_file)
+      dest_password = utils.ReadOneLineFile(options.dest_password_file,
+                                            strict=True)
+    else:
+      logging.debug("Using source password for destination")
+      dest_password = src_password
+
+    self.GetDestClient = lambda: \
+      rapi.client.GanetiRapiClient(dest_cluster_name,
+                                   port=dest_rapi_port,
+                                   config_ssl_verification=dest_ssl_config,
+                                   username=dest_username,
+                                   password=dest_password)
+
+
+class MoveJobPollReportCb(cli.JobPollReportCbBase):
+  def __init__(self, abort_check_fn, remote_import_fn):
+    """Initializes this class.
+
+    @type abort_check_fn: callable
+    @param abort_check_fn: Function to check whether move is aborted
+    @type remote_import_fn: callable or None
+    @param remote_import_fn: Callback for reporting received remote import
+                             information
+
+    """
+    cli.JobPollReportCbBase.__init__(self)
+    self._abort_check_fn = abort_check_fn
+    self._remote_import_fn = remote_import_fn
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    if log_type == constants.ELOG_REMOTE_IMPORT:
+      logging.debug("Received remote import information")
+
+      if not self._remote_import_fn:
+        raise RuntimeError("Received unexpected remote import information")
+
+      assert "x509_ca" in log_msg
+      assert "disks" in log_msg
+
+      self._remote_import_fn(log_msg)
+
+      return
+
+    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
+                 utils.SafeEncode(log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    try:
+      # Check whether we were told to abort by the other thread
+      self._abort_check_fn()
+    except Abort:
+      logging.warning("Aborting despite job %s still running", job_id)
+      raise
+
+
+class InstanceMove(object):
+  """Status class for instance moves.
+
+  """
+  def __init__(self, src_instance_name, dest_instance_name,
+               dest_pnode, dest_snode, dest_iallocator):
+    """Initializes this class.
+
+    @type src_instance_name: string
+    @param src_instance_name: Instance name on source cluster
+    @type dest_instance_name: string
+    @param dest_instance_name: Instance name on destination cluster
+    @type dest_pnode: string or None
+    @param dest_pnode: Name of primary node on destination cluster
+    @type dest_snode: string or None
+    @param dest_snode: Name of secondary node on destination cluster
+    @type dest_iallocator: string or None
+    @param dest_iallocator: Name of iallocator to use
+
+    """
+    self.src_instance_name = src_instance_name
+    self.dest_instance_name = dest_instance_name
+    self.dest_pnode = dest_pnode
+    self.dest_snode = dest_snode
+    self.dest_iallocator = dest_iallocator
+
+    self.success = None
+    self.error_message = None
+
+
+class MoveRuntime(object):
+  """Class to keep track of instance move.
+
+  """
+  def __init__(self, move):
+    """Initializes this class.
+
+    @type move: L{InstanceMove}
+
+    """
+    self.move = move
+
+    # Thread synchronization
+    self.lock = threading.Lock()
+    self.source_to_dest = threading.Condition(self.lock)
+    self.dest_to_source = threading.Condition(self.lock)
+
+    # Set when threads should abort
+    self.abort = None
+
+    # Source information
+    self.src_success = None
+    self.src_error_message = None
+    self.src_expinfo = None
+    self.src_instinfo = None
+
+    # Destination information
+    self.dest_success = None
+    self.dest_error_message = None
+    self.dest_impinfo = None
+
+  def HandleErrors(self, prefix, fn, *args):
+    """Wrapper to catch errors and abort threads.
+
+    @type prefix: string
+    @param prefix: Variable name prefix ("src" or "dest")
+    @type fn: callable
+    @param fn: Function
+
+    """
+    assert prefix in ("dest", "src")
+
+    try:
+      # Call inner function
+      fn(*args)
+
+      success = True
+      errmsg = None
+    except Abort:
+      success = False
+      errmsg = "Aborted"
+    except Exception, err:
+      logging.exception("Caught unhandled exception")
+      success = False
+      errmsg = str(err)
+
+    self.lock.acquire()
+    try:
+      # Tell all threads to abort
+      self.abort = True
+      self.source_to_dest.notifyAll()
+      self.dest_to_source.notifyAll()
+    finally:
+      self.lock.release()
+
+    setattr(self, "%s_success" % prefix, success)
+    setattr(self, "%s_error_message" % prefix, errmsg)
+
+  def CheckAbort(self):
+    """Check whether thread should be aborted.
+
+    @raise Abort: When thread should be aborted
+
+    """
+    if self.abort:
+      logging.info("Aborting")
+      raise Abort()
+
+  def Wait(self, cond, check_fn):
+    """Waits for a condition to become true.
+
+    @type cond: threading.Condition
+    @param cond: Threading condition
+    @type check_fn: callable
+    @param check_fn: Function to check whether condition is true
+
+    """
+    cond.acquire()
+    try:
+      while check_fn(self):
+        self.CheckAbort()
+        cond.wait()
+    finally:
+      cond.release()
+
+  def PollJob(self, cl, job_id, remote_import_fn=None):
+    """Wrapper for polling a job.
+
+    @type cl: L{rapi.client.GanetiRapiClient}
+    @param cl: RAPI client
+    @type job_id: string
+    @param job_id: Job ID
+    @type remote_import_fn: callable or None
+    @param remote_import_fn: Callback for reporting received remote import
+                             information
+
+    """
+    return rapi.client_utils.PollJob(cl, job_id,
+                                     MoveJobPollReportCb(self.CheckAbort,
+                                                         remote_import_fn))
+
+
+class MoveDestExecutor(object):
+  def __init__(self, dest_client, mrt):
+    """Destination side of an instance move.
+
+    @type dest_client: L{rapi.client.GanetiRapiClient}
+    @param dest_client: RAPI client
+    @type mrt: L{MoveRuntime}
+    @param mrt: Instance move runtime information
+
+    """
+    logging.debug("Waiting for instance information to become available")
+    mrt.Wait(mrt.source_to_dest,
+             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
+
+    logging.info("Creating instance %s in remote-import mode",
+                 mrt.move.dest_instance_name)
+    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
+                                  mrt.move.dest_pnode, mrt.move.dest_snode,
+                                  mrt.move.dest_iallocator,
+                                  mrt.src_instinfo, mrt.src_expinfo)
+    mrt.PollJob(dest_client, job_id,
+                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
+
+    logging.info("Import successful")
+
+  @staticmethod
+  def _SetImportInfo(mrt, impinfo):
+    """Sets the remote import information and notifies source thread.
+
+    @type mrt: L{MoveRuntime}
+    @param mrt: Instance move runtime information
+    @param impinfo: Remote import information
+
+    """
+    mrt.dest_to_source.acquire()
+    try:
+      mrt.dest_impinfo = impinfo
+      mrt.dest_to_source.notifyAll()
+    finally:
+      mrt.dest_to_source.release()
+
+  @staticmethod
+  def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
+    """Starts the instance creation in remote import mode.
+
+    @type cl: L{rapi.client.GanetiRapiClient}
+    @param cl: RAPI client
+    @type name: string
+    @param name: Instance name
+    @type pnode: string or None
+    @param pnode: Name of primary node on destination cluster
+    @type snode: string or None
+    @param snode: Name of secondary node on destination cluster
+    @type iallocator: string or None
+    @param iallocator: Name of iallocator to use
+    @type instance: dict
+    @param instance: Instance details from source cluster
+    @type expinfo: dict
+    @param expinfo: Prepared export information from source cluster
+    @return: Job ID
+
+    """
+    disk_template = instance["disk_template"]
+
+    disks = [{
+      "size": i["size"],
+      "mode": i["mode"],
+      } for i in instance["disks"]]
+
+    nics = [{
+      "ip": ip,
+      "mac": mac,
+      "mode": mode,
+      "link": link,
+      } for ip, mac, mode, link in instance["nics"]]
+
+    # TODO: Should this be the actual up/down status? (run_state)
+    start = (instance["config_state"] == "up")
+
+    assert len(disks) == len(instance["disks"])
+    assert len(nics) == len(instance["nics"])
+
+    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
+                             name, disk_template, disks, nics,
+                             os=instance["os"],
+                             pnode=pnode,
+                             snode=snode,
+                             start=start,
+                             ip_check=False,
+                             iallocator=iallocator,
+                             hypervisor=instance["hypervisor"],
+                             source_handshake=expinfo["handshake"],
+                             source_x509_ca=expinfo["x509_ca"],
+                             source_instance_name=instance["name"],
+                             beparams=instance["be_instance"],
+                             hvparams=instance["hv_instance"])
+
+
+class MoveSourceExecutor(object):
+  def __init__(self, src_client, mrt):
+    """Source side of an instance move.
+
+    @type src_client: L{rapi.client.GanetiRapiClient}
+    @param src_client: RAPI client
+    @type mrt: L{MoveRuntime}
+    @param mrt: Instance move runtime information
+
+    """
+    logging.info("Checking whether instance exists")
+    self._CheckInstance(src_client, mrt.move.src_instance_name)
+
+    logging.info("Retrieving instance information from source cluster")
+    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
+                                     mrt.move.src_instance_name)
+
+    logging.info("Preparing export on source cluster")
+    expinfo = self._PrepareExport(src_client, mrt.PollJob,
+                                  mrt.move.src_instance_name)
+    assert "handshake" in expinfo
+    assert "x509_key_name" in expinfo
+    assert "x509_ca" in expinfo
+
+    # Hand information to destination thread
+    mrt.source_to_dest.acquire()
+    try:
+      mrt.src_instinfo = instinfo
+      mrt.src_expinfo = expinfo
+      mrt.source_to_dest.notifyAll()
+    finally:
+      mrt.source_to_dest.release()
+
+    logging.info("Waiting for destination information to become available")
+    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
+
+    logging.info("Starting remote export on source cluster")
+    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
+                         expinfo["x509_key_name"], mrt.dest_impinfo)
+
+    logging.info("Export successful")
+
+  @staticmethod
+  def _CheckInstance(cl, name):
+    """Checks whether the instance exists on the source cluster.
+
+    @type cl: L{rapi.client.GanetiRapiClient}
+    @param cl: RAPI client
+    @type name: string
+    @param name: Instance name
+
+    """
+    try:
+      cl.GetInstance(name)
+    except rapi.client.GanetiApiError, err:
+      if err.code == rapi.client.HTTP_NOT_FOUND:
+        raise Error("Instance %s not found (%s)" % (name, str(err)))
+      raise
+
+  @staticmethod
+  def _GetInstanceInfo(cl, poll_job_fn, name):
+    """Retrieves detailed instance information from source cluster.
+
+    @type cl: L{rapi.client.GanetiRapiClient}
+    @param cl: RAPI client
+    @type poll_job_fn: callable
+    @param poll_job_fn: Function to poll for job result
+    @type name: string
+    @param name: Instance name
+
+    """
+    job_id = cl.GetInstanceInfo(name, static=True)
+    result = poll_job_fn(cl, job_id)
+    assert len(result[0].keys()) == 1
+    return result[0][result[0].keys()[0]]
+
+  @staticmethod
+  def _PrepareExport(cl, poll_job_fn, name):
+    """Prepares export on source cluster.
+
+    @type cl: L{rapi.client.GanetiRapiClient}
+    @param cl: RAPI client
+    @type poll_job_fn: callable
+    @param poll_job_fn: Function to poll for job result
+    @type name: string
+    @param name: Instance name
+
+    """
+    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
+    return poll_job_fn(cl, job_id)[0]
+
+  @staticmethod
+  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
+    """Exports instance from source cluster.
+
+    @type cl: L{rapi.client.GanetiRapiClient}
+    @param cl: RAPI client
+    @type poll_job_fn: callable
+    @param poll_job_fn: Function to poll for job result
+    @type name: string
+    @param name: Instance name
+    @param x509_key_name: Source X509 key
+    @param impinfo: Import information from destination cluster
+
+    """
+    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
+                               impinfo["disks"], shutdown=True,
+                               remove_instance=True,
+                               x509_key_name=x509_key_name,
+                               destination_x509_ca=impinfo["x509_ca"])
+    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
+
+    if not (fin_resu and compat.all(dresults)):
+      raise Error("Export failed for disks %s" %
+                  utils.CommaJoin(str(idx) for idx, result
+                                  in enumerate(dresults) if not result))
+
+
+class MoveSourceWorker(workerpool.BaseWorker):
+  def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
+    """Executes an instance move.
+
+    @type rapi_factory: L{RapiClientFactory}
+    @param rapi_factory: RAPI client factory
+    @type move: L{InstanceMove}
+    @param move: Instance move information
+
+    """
+    try:
+      logging.info("Preparing to move %s from cluster %s to %s as %s",
+                   move.src_instance_name, rapi_factory.src_cluster_name,
+                   rapi_factory.dest_cluster_name, move.dest_instance_name)
+
+      mrt = MoveRuntime(move)
+
+      logging.debug("Starting destination thread")
+      source_thread = threading.Thread(name="DestFor%s" % self.getName(),
+                                       target=mrt.HandleErrors,
+                                       args=("dest", MoveDestExecutor,
+                                             rapi_factory.GetDestClient(),
+                                             mrt, ))
+      source_thread.start()
+      try:
+        mrt.HandleErrors("src", MoveSourceExecutor,
+                         rapi_factory.GetSourceClient(), mrt)
+      finally:
+        source_thread.join()
+
+      move.success = (mrt.src_success and mrt.dest_success)
+      if mrt.src_error_message or mrt.dest_error_message:
+        move.error_message = ("Source error: %s, destination error: %s" %
+                              (mrt.src_error_message, mrt.dest_error_message))
+      else:
+        move.error_message = None
+    except Exception, err: # pylint: disable-msg=W0703
+      logging.exception("Caught unhandled exception")
+      move.success = False
+      move.error_message = str(err)
+
+
+def CheckRapiSetup(rapi_factory):
+  """Checks the RAPI setup by retrieving the version.
+
+  @type rapi_factory: L{RapiClientFactory}
+  @param rapi_factory: RAPI client factory
+
+  """
+  src_client = rapi_factory.GetSourceClient()
+  logging.info("Connecting to source RAPI server")
+  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
+
+  dest_client = rapi_factory.GetDestClient()
+  logging.info("Connecting to destination RAPI server")
+  logging.info("Destination cluster RAPI version: %s", 
dest_client.GetVersion())
+
+
+def SetupLogging(options):
+  """Setting up logging infrastructure.
+
+  @param options: Parsed command line options
+
+  """
+  fmt = "%(asctime)s: %(threadName)s "
+  if options.debug or options.verbose:
+    fmt += "%(levelname)s "
+  fmt += "%(message)s"
+
+  formatter = logging.Formatter(fmt)
+
+  stderr_handler = logging.StreamHandler()
+  stderr_handler.setFormatter(formatter)
+  if options.debug:
+    stderr_handler.setLevel(logging.NOTSET)
+  elif options.verbose:
+    stderr_handler.setLevel(logging.INFO)
+  else:
+    stderr_handler.setLevel(logging.ERROR)
+
+  root_logger = logging.getLogger("")
+  root_logger.setLevel(logging.NOTSET)
+  root_logger.addHandler(stderr_handler)
+
+
+def ParseOptions():
+  """Parses options passed to program.
+
+  """
+  program = os.path.basename(sys.argv[0])
+
+  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
+                                        " <source-cluster> <dest-cluster>"
+                                        " <instance...>"),
+                                 prog=program)
+  parser.add_option(cli.DEBUG_OPT)
+  parser.add_option(cli.VERBOSE_OPT)
+  parser.add_option(cli.IALLOCATOR_OPT)
+  parser.add_option(SRC_RAPI_PORT_OPT)
+  parser.add_option(SRC_CA_FILE_OPT)
+  parser.add_option(SRC_USERNAME_OPT)
+  parser.add_option(SRC_PASSWORD_FILE_OPT)
+  parser.add_option(DEST_RAPI_PORT_OPT)
+  parser.add_option(DEST_CA_FILE_OPT)
+  parser.add_option(DEST_USERNAME_OPT)
+  parser.add_option(DEST_PASSWORD_FILE_OPT)
+  parser.add_option(DEST_INSTANCE_NAME_OPT)
+  parser.add_option(DEST_PRIMARY_NODE_OPT)
+  parser.add_option(DEST_SECONDARY_NODE_OPT)
+  parser.add_option(PARALLEL_OPT)
+
+  (options, args) = parser.parse_args()
+
+  return (parser, options, args)
+
+
+def CheckOptions(parser, options, args):
+  """Checks options and arguments for validity.
+
+  """
+  if len(args) < 3:
+    parser.error("Not enough arguments")
+
+  src_cluster_name = args.pop(0)
+  dest_cluster_name = args.pop(0)
+  instance_names = args
+
+  assert len(instance_names) > 0
+
+  # TODO: Remove once using system default paths for SSL certificate
+  # verification is implemented
+  if not options.src_ca_file:
+    parser.error("Missing source cluster CA file")
+
+  if options.parallel < 1:
+    parser.error("Number of simultaneous moves must be >= 1")
+
+  if not (bool(options.iallocator) ^
+          bool(options.dest_primary_node or options.dest_secondary_node)):
+    parser.error("Destination node and iallocator options exclude each other")
+
+  if len(instance_names) == 1:
+    # Moving one instance only
+    if not (options.iallocator or
+            options.dest_primary_node or
+            options.dest_secondary_node):
+      parser.error("An iallocator or the destination node is required")
+  else:
+    # Moving more than one instance
+    if (options.dest_instance_name or options.dest_primary_node or
+        options.dest_secondary_node):
+      parser.error("The options --dest-instance-name, --dest-primary-node and"
+                   " --dest-secondary-node can only be used when moving 
exactly"
+                   " one instance")
+
+    if not options.iallocator:
+      parser.error("An iallocator must be specified for moving more than one"
+                   " instance")
+
+  return (src_cluster_name, dest_cluster_name, instance_names)
+
+
+def main():
+  """Main routine.
+
+  """
+  (parser, options, args) = ParseOptions()
+
+  SetupLogging(options)
+
+  (src_cluster_name, dest_cluster_name, instance_names) = \
+    CheckOptions(parser, options, args)
+
+  logging.info("Source cluster: %s", src_cluster_name)
+  logging.info("Destination cluster: %s", dest_cluster_name)
+  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
+
+  rapi_factory = RapiClientFactory(options, src_cluster_name, 
dest_cluster_name)
+
+  CheckRapiSetup(rapi_factory)
+
+  assert (len(instance_names) == 1 or
+          not (options.dest_primary_node or options.dest_secondary_node))
+  assert len(instance_names) == 1 or options.iallocator
+  assert (len(instance_names) > 1 or options.iallocator or
+          options.dest_primary_node or options.dest_secondary_node)
+
+  # Prepare list of instance moves
+  moves = []
+  for src_instance_name in instance_names:
+    if options.dest_instance_name:
+      assert len(instance_names) == 1
+      # Rename instance
+      dest_instance_name = options.dest_instance_name
+    else:
+      dest_instance_name = src_instance_name
+
+    moves.append(InstanceMove(src_instance_name, dest_instance_name,
+                              options.dest_primary_node,
+                              options.dest_secondary_node,
+                              options.iallocator))
+
+  assert len(moves) == len(instance_names)
+
+  # Start workerpool
+  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
+  try:
+    # Add instance moves to workerpool
+    for move in moves:
+      wp.AddTask(rapi_factory, move)
+
+    # Wait for all moves to finish
+    wp.Quiesce()
+
+  finally:
+    wp.TerminateWorkers()
+
+  # There should be no threads running at this point, hence not using locks
+  # anymore
+
+  logging.info("Instance move results:")
+
+  for move in moves:
+    if move.dest_instance_name == move.src_instance_name:
+      name = move.src_instance_name
+    else:
+      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
+
+    if move.success and not move.error_message:
+      msg = "Success"
+    else:
+      msg = "Failed (%s)" % move.error_message
+
+    logging.info("%s: %s", name, msg)
+
+  if compat.all(move.success for move in moves):
+    sys.exit(constants.EXIT_SUCCESS)
+
+  sys.exit(constants.EXIT_FAILURE)
+
+
+if __name__ == "__main__":
+  main()
-- 
1.7.0.4

Reply via email to