LGTM.

Thanks,
Jose

On Fri, Oct 04, 2013 at 02:20:34PM +0200, Klaus Aehlig wrote:
> This command will coordinate the switching to a new
> Ganeti version across the cluster. This has become
> possible by the new layout that allows several Ganeti
> versions to be present at the same time.
> 
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/client/gnt_cluster.py | 283 
> ++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 283 insertions(+)
> 
> diff --git a/lib/client/gnt_cluster.py b/lib/client/gnt_cluster.py
> index 8d81519..225cd56 100644
> --- a/lib/client/gnt_cluster.py
> +++ b/lib/client/gnt_cluster.py
> @@ -43,7 +43,9 @@ from ganeti import objects
>  from ganeti import uidpool
>  from ganeti import compat
>  from ganeti import netutils
> +from ganeti import ssconf
>  from ganeti import pathutils
> +from ganeti import qlang
>  
>  
>  ON_OPT = cli_option("--on", default=False,
> @@ -64,6 +66,12 @@ FORCE_DISTRIBUTION = cli_option("--yes-do-it", 
> dest="yes_do_it",
>                                  " is drained",
>                                  default=False, action="store_true")
>  
> +TO_OPT = cli_option("--to", default=None, type="string",
> +                    help="The Ganeti version to upgrade to")
> +
> +RESUME_OPT = cli_option("--resume", default=False, action="store_true",
> +                        help="Resume any pending Ganeti upgrades")
> +
>  _EPO_PING_INTERVAL = 30 # 30 seconds between pings
>  _EPO_PING_TIMEOUT = 1 # 1 second
>  _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
> @@ -1617,6 +1625,278 @@ def ShowCreateCommand(opts, args):
>    ToStdout(_GetCreateCommand(result))
>  
>  
> +def _RunCommandAndReport(cmd):
> +  """Run a command and report its output, iff it failed.
> +
> +  @param cmd: the command to execute
> +  @type cmd: list
> +  @rtype: bool
> +  @return: False, if the execution failed.
> +
> +  """
> +  result = utils.RunCmd(cmd)
> +  if result.failed:
> +    ToStderr("Command %s failed: %s; Output %s" %
> +             (cmd, result.fail_reason, result.output))
> +    return False
> +  return True
> +
> +
> +def _VerifyCommand(cmd):
> +  """Verify that a given command succeeds on all online nodes.
> +
> +  As this function is intended to run during upgrades, it
> +  is implemented in such a way that it still works, if all Ganeti
> +  daemons are down.
> +
> +  @param cmd: the command to execute
> +  @type cmd: list
> +  @rtype: list
> +  @return: the list of node names that are online where
> +      the command failed.
> +
> +  """
> +  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
> +
> +  nodes = ssconf.SimpleStore().GetOnlineNodeList()
> +  master_node = ssconf.SimpleStore().GetMasterNode()
> +  cluster_name = ssconf.SimpleStore().GetClusterName()
> +
> +  # If master node is in 'nodes', make sure master node is at list end
> +  if master_node in nodes:
> +    nodes.remove(master_node)
> +    nodes.append(master_node)
> +
> +  failed = []
> +
> +  srun = ssh.SshRunner(cluster_name=cluster_name)
> +  for name in nodes:
> +    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
> +    if result.exit_code != 0:
> +      failed.append(name)
> +
> +  return failed
> +
> +
> +def _VerifyVersionInstalled(versionstring):
> +  """Verify that the given version of ganeti is installed on all online 
> nodes.
> +
> +  Do nothing, if this is the case, otherwise print an appropriate
> +  message to stderr.
> +
> +  @param versionstring: the version to check for
> +  @type versionstring: string
> +  @rtype: bool
> +  @return: True, if the version is installed on all online nodes
> +
> +  """
> +  badnodes = _VerifyCommand(["test", "-d",
> +                             os.path.join(pathutils.PKGLIBDIR, 
> versionstring)])
> +  if badnodes:
> +    ToStderr("Ganeti version %s not installed on nodes %s"
> +             % (versionstring, ", ".join(badnodes)))
> +    return False
> +
> +  return True
> +
> +
> +def _GetRunning():
> +  """Determine the list of running jobs.
> +
> +  @rtype: list
> +  @return: the number of jobs still running
> +
> +  """
> +  cl = GetClient()
> +  qfilter = qlang.MakeSimpleFilter("status",
> +                                   frozenset([constants.JOB_STATUS_RUNNING]))
> +  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
> +
> +
> +def _SetGanetiVersion(versionstring):
> +  """Set the active version of ganeti to the given versionstring
> +
> +  @type versionstring: string
> +  @rtype: list
> +  @return: the list of nodes where the version change failed
> +
> +  """
> +  failed = []
> +  failed.extend(_VerifyCommand(
> +      ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
> +  failed.extend(_VerifyCommand(
> +      ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
> +       os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
> +  failed.extend(_VerifyCommand(
> +      ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
> +  failed.extend(_VerifyCommand(
> +      ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
> +       os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
> +  return list(set(failed))
> +
> +
> +def _ExecuteCommands(fns):
> +  """Execute a list of functions, in reverse order.
> +
> +  @type fns: list of functions.
> +  @param fns: the functions to be executed.
> +
> +  """
> +  for fn in reversed(fns):
> +    fn()
> +
> +
> +# pylint: disable=R0911
> +def UpgradeGanetiCommand(opts, args):
> +  """Upgrade a cluster to a new ganeti version.
> +
> +  @param opts: the command line options selected by the user
> +  @type args: list
> +  @param args: should be an empty list
> +  @rtype: int
> +  @return: the desired exit code
> +
> +  """
> +  if ((not opts.resume and opts.to is None)
> +      or (opts.resume and opts.to is not None)):
> +    ToStderr("Precisely one of the options --to and --resume"
> +             " has to be given")
> +    return 1
> +
> +  if opts.resume:
> +    # TODO: implement
> +    ToStderr("The --resume mode is not yet implemented")
> +    return 1
> +
> +  rollback = []
> +
> +  versionstring = opts.to
> +  version = utils.version.ParseVersion(versionstring)
> +  if version is None:
> +    ToStderr("Could not parse version string %s" % versionstring)
> +    return 1
> +
> +  msg = utils.version.UpgradeRange(version)
> +  if msg is not None:
> +    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
> +    return 1
> +
> +  downgrade = utils.version.ShouldCfgdowngrade(version)
> +
> +  if not _VerifyVersionInstalled(versionstring):
> +    return 1
> +
> +  # TODO: write intent-to-upgrade file
> +
> +  ToStdout("Draining queue")
> +  client = GetClient()
> +  client.SetQueueDrainFlag(True)
> +
> +  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
> +
> +  if utils.SimpleRetry(0, _GetRunning,
> +                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
> +                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
> +    ToStderr("Failed to completely empty the queue.")
> +    _ExecuteCommands(rollback)
> +    return 1
> +
> +  ToStdout("Stopping daemons on master node.")
> +  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
> +    _ExecuteCommands(rollback)
> +    return 1
> +
> +  if not _VerifyVersionInstalled(versionstring):
> +    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
> +    _ExecuteCommands(rollback)
> +    return 1
> +
> +  ToStdout("Stopping daemons everywhere.")
> +  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, 
> "start-all"]))
> +  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
> +  if badnodes:
> +    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
> +    _ExecuteCommands(rollback)
> +    return 1
> +
> +  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
> +                           "lib/ganeti%d.tar" % time.time())
> +  ToStdout("Backing up configuration as %s" % backuptar)
> +  if not _RunCommandAndReport(["tar", "cf", backuptar,
> +                               pathutils.DATA_DIR]):
> +    _ExecuteCommands(rollback)
> +    return 1
> +
> +  if downgrade:
> +    ToStdout("Downgrading configuration")
> +    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
> +      _ExecuteCommands(rollback)
> +      return 1
> +
> +  # Configuration change is the point of no return. From then onwards, it is
> +  # safer to push through the up/dowgrade than to try to roll it back.
> +
> +  returnvalue = 0
> +
> +  ToStdout("Switching to version %s on all nodes" % versionstring)
> +  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
> +  badnodes = _SetGanetiVersion(versionstring)
> +  if badnodes:
> +    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
> +             % (versionstring, ", ".join(badnodes)))
> +    if not downgrade:
> +      _ExecuteCommands(rollback)
> +      return 1
> +
> +  # Now that we have changed to the new version of Ganeti we should
> +  # not communicate over luxi any more, as luxi might have changed in
> +  # incompatible ways. Therefore, manually call the corresponding ganeti
> +  # commands using their canonical (version independent) path.
> +
> +  if not downgrade:
> +    ToStdout("Upgrading configuration")
> +    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
> +      _ExecuteCommands(rollback)
> +      return 1
> +
> +  ToStdout("Starting daemons everywhere.")
> +  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
> +  if badnodes:
> +    ToStderr("Warning: failed to start daemons on %s." % (", 
> ".join(badnodes),))
> +    returnvalue = 1
> +
> +  ToStdout("Ensuring directories everywhere.")
> +  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
> +  if badnodes:
> +    ToStderr("Warning: failed to ensure directories on %s." %
> +             (", ".join(badnodes)))
> +    returnvalue = 1
> +
> +  ToStdout("Redistributing the configuration.")
> +  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
> +    returnvalue = 1
> +
> +  ToStdout("Restarting daemons everywhere.")
> +  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
> +  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
> +  if badnodes:
> +    ToStderr("Warning: failed to start daemons on %s." %
> +             (", ".join(list(set(badnodes))),))
> +    returnvalue = 1
> +
> +  ToStdout("Undraining the queue.")
> +  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
> +    returnvalue = 1
> +
> +  # TODO: write intent-to-upgrade file
> +
> +  ToStdout("Verifying cluster.")
> +  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
> +    returnvalue = 1
> +
> +  return returnvalue
> +
> +
>  commands = {
>    "init": (
>      InitCluster, [ArgHost(min=1, max=1)],
> @@ -1735,6 +2015,9 @@ commands = {
>    "show-ispecs-cmd": (
>      ShowCreateCommand, ARGS_NONE, [], "",
>      "Show the command line to re-create the cluster"),
> +  "upgrade": (
> +    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
> +    "Upgrade (or downgrade) to a new Ganeti version"),
>    }
>  
>  
> -- 
> 1.8.4
> 

-- 
Jose Antonio Lopes
Ganeti Engineering
Google Germany GmbH
Dienerstr. 12, 80331, München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores
Steuernummer: 48/725/00206
Umsatzsteueridentifikationsnummer: DE813741370

Reply via email to