With masterd no longer in use, quite a few of its classes
are not needed any more.
Signed-off-by: Klaus Aehlig <[email protected]>
---
lib/server/masterd.py | 170 --------------------------------------------------
1 file changed, 170 deletions(-)
diff --git a/lib/server/masterd.py b/lib/server/masterd.py
index cff55b7..bc39353 100644
--- a/lib/server/masterd.py
+++ b/lib/server/masterd.py
@@ -38,18 +38,13 @@ inheritance from parent classes requires it.
# pylint: disable=C0103
# C0103: Invalid name ganeti-masterd
-import time
import logging
from ganeti import config
from ganeti import constants
-from ganeti import daemon
from ganeti import jqueue
-from ganeti import luxi
from ganeti import utils
-from ganeti import errors
import ganeti.rpc.node as rpc
-from ganeti import ht
CLIENT_REQUEST_WORKERS = 16
@@ -58,137 +53,6 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
-def _LogNewJob(status, info, ops):
- """Log information about a recently submitted job.
-
- """
- op_summary = utils.CommaJoin(op.Summary() for op in ops)
-
- if status:
- logging.info("New job with id %s, summary: %s", info, op_summary)
- else:
- logging.info("Failed to submit job, reason: '%s', summary: %s",
- info, op_summary)
-
-
-class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
- """Handler for master peers.
-
- """
- _MAX_UNHANDLED = 1
-
- def __init__(self, server, connected_socket, client_address, family):
- daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
- client_address,
- constants.LUXI_EOM,
- family, self._MAX_UNHANDLED)
- self.server = server
-
- def handle_message(self, message, _):
- self.server.request_workers.AddTask((self.server, message, self))
-
-
-class _MasterShutdownCheck(object):
- """Logic for master daemon shutdown.
-
- """
- #: How long to wait between checks
- _CHECK_INTERVAL = 5.0
-
- #: How long to wait after all jobs are done (e.g. to give clients time to
- #: retrieve the job status)
- _SHUTDOWN_LINGER = 5.0
-
- def __init__(self):
- """Initializes this class.
-
- """
- self._had_active_jobs = None
- self._linger_timeout = None
-
- def __call__(self, jq_prepare_result):
- """Determines if master daemon is ready for shutdown.
-
- @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
- @rtype: None or number
- @return: None if master daemon is ready, timeout if the check must be
- repeated
-
- """
- if jq_prepare_result:
- # Check again shortly
- logging.info("Job queue has been notified for shutdown but is still"
- " busy; next check in %s seconds", self._CHECK_INTERVAL)
- self._had_active_jobs = True
- return self._CHECK_INTERVAL
-
- if not self._had_active_jobs:
- # Can shut down as there were no active jobs on the first check
- return None
-
- # No jobs are running anymore, but maybe some clients want to collect some
- # information. Give them a short amount of time.
- if self._linger_timeout is None:
- self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
-
- remaining = self._linger_timeout.Remaining()
-
- logging.info("Job queue no longer busy; shutting down master daemon"
- " in %s seconds", remaining)
-
- # TODO: Should the master daemon socket be closed at this point? Doing so
- # wouldn't affect existing connections.
-
- if remaining < 0:
- return None
- else:
- return remaining
-
-
-class ClientOps(object):
- """Class holding high-level client operations."""
- def __init__(self, server):
- self.server = server
-
- @staticmethod
- def _PickupJob(args, queue):
- logging.info("Picking up new job from queue")
- (job_id, ) = args
- queue.PickupJob(job_id)
- return job_id
-
- @staticmethod
- def _ChangeJobPriority(args, queue):
- (job_id, priority) = args
- logging.info("Received request to change priority for job %s to %s",
- job_id, priority)
- return queue.ChangeJobPriority(job_id, priority)
-
- def handle_request(self, method, args): # pylint: disable=R0911
- context = self.server.context
- queue = context.jobqueue
-
- # TODO: Parameter validation
- if not isinstance(args, (tuple, list)):
- logging.info("Received invalid arguments of type '%s'", type(args))
- raise ValueError("Invalid arguments type '%s'" % type(args))
-
- if method not in luxi.REQ_ALL:
- logging.info("Received invalid request '%s'", method)
- raise ValueError("Invalid operation '%s'" % method)
-
- job_id = None
- if method == luxi.REQ_PICKUP_JOB:
- job_id = self._PickupJob(args, queue)
- elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
- job_id = self._ChangeJobPriority(args, queue)
- else:
- logging.info("Request '%s' not supported by masterd", method)
- raise ValueError("Unsupported operation '%s'" % method)
-
- return job_id
-
-
class GanetiContext(object):
"""Context common to all ganeti threads.
@@ -266,37 +130,3 @@ class GanetiContext(object):
# Notify job queue
self.jobqueue.RemoveNode(node.name)
-
-
-def _SetWatcherPause(context, ec_id, until):
- """Creates or removes the watcher pause file.
-
- @type context: L{GanetiContext}
- @param context: Global Ganeti context
- @type until: None or int
- @param until: Unix timestamp saying until when the watcher shouldn't run
-
- """
- node_names = context.GetConfig(ec_id).GetNodeList()
-
- if until is None:
- logging.info("Received request to no longer pause watcher")
- else:
- if not ht.TNumber(until):
- raise TypeError("Duration must be numeric")
-
- if until < time.time():
- raise errors.GenericError("Unable to set pause end time in the past")
-
- logging.info("Received request to pause watcher until %s", until)
-
- result = context.rpc.call_set_watcher_pause(node_names, until)
-
- errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
- for (node_name, nres) in result.items()
- if nres.fail_msg and not nres.offline)
- if errmsg:
- raise errors.OpExecError("Watcher pause was set where possible, but failed"
- " on the following node(s): %s" % errmsg)
-
- return until
--
2.2.0.rc0.207.ga3a616c