... and by that fixing a TODO as well.
Signed-off-by: Helga Velroyen <[email protected]>
---
lib/server/masterd.py | 284 +++++++++++++++++++++++++++++---------------------
1 file changed, 166 insertions(+), 118 deletions(-)
diff --git a/lib/server/masterd.py b/lib/server/masterd.py
index adfb914..ffd7deb 100644
--- a/lib/server/masterd.py
+++ b/lib/server/masterd.py
@@ -266,6 +266,152 @@ class ClientOps(object):
def __init__(self, server):
self.server = server
+ @staticmethod
+ def _SubmitJob(args, queue):
+ logging.info("Receiving new job")
+ (job_def, ) = args
+ ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
+ job_id = queue.SubmitJob(ops)
+ _LogNewJob(True, job_id, ops)
+ return job_id
+
+ @staticmethod
+ def _PickupJob(args, queue):
+ logging.info("Picking up new job from queue")
+ (job_id, ) = args
+ queue.PickupJob(job_id)
+ return job_id
+
+ @staticmethod
+ def _SubmitJobToDrainedQueue(args, queue):
+ logging.info("Forcefully receiving new job")
+ (job_def, ) = args
+ ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
+ job_id = queue.SubmitJobToDrainedQueue(ops)
+ _LogNewJob(True, job_id, ops)
+ return job_id
+
+ @staticmethod
+ def _SubmitManyJobs(args, queue):
+ logging.info("Receiving multiple jobs")
+ (job_defs, ) = args
+ jobs = []
+ for ops in job_defs:
+ jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+ job_ids = queue.SubmitManyJobs(jobs)
+ for ((status, job_id), ops) in zip(job_ids, jobs):
+ _LogNewJob(status, job_id, ops)
+ return job_ids
+
+ @staticmethod
+ def _CancelJob(args, queue):
+ (job_id, ) = args
+ logging.info("Received job cancel request for %s", job_id)
+ return queue.CancelJob(job_id)
+
+ @staticmethod
+ def _ChangeJobPriority(args, queue):
+ (job_id, priority) = args
+ logging.info("Received request to change priority for job %s to %s",
+ job_id, priority)
+ return queue.ChangeJobPriority(job_id, priority)
+
+ @staticmethod
+ def _ArchiveJob(args, queue):
+ (job_id, ) = args
+ logging.info("Received job archive request for %s", job_id)
+ return queue.ArchiveJob(job_id)
+
+ @staticmethod
+ def _AutoArchiveJobs(args, queue):
+ (age, timeout) = args
+ logging.info("Received job autoarchive request for age %s, timeout %s",
+ age, timeout)
+ return queue.AutoArchiveJobs(age, timeout)
+
+ @staticmethod
+ def _WaitForJobChange(args, queue):
+ (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
+ logging.info("Received job poll request for %s", job_id)
+ return queue.WaitForJobChanges(job_id, fields, prev_job_info,
+ prev_log_serial, timeout)
+
+ def _PerformQuery(self, args, queue):
+ (what, fields, qfilter) = args
+
+ if what in constants.QR_VIA_OP:
+ result = self._Query(opcodes.OpQuery(what=what, fields=fields,
+ qfilter=qfilter))
+ elif what == constants.QR_LOCK:
+ if qfilter is not None:
+ raise errors.OpPrereqError("Lock queries can't be filtered",
+ errors.ECODE_INVAL)
+ return self.server.context.glm.QueryLocks(fields)
+ elif what == constants.QR_JOB:
+ return queue.QueryJobs(fields, qfilter)
+ elif what in constants.QR_VIA_LUXI:
+ luxi_client = runtime.GetClient()
+ result = luxi_client.Query(what, fields, qfilter).ToDict()
+ else:
+ raise errors.OpPrereqError("Resource type '%s' unknown" % what,
+ errors.ECODE_INVAL)
+
+ return result
+
+ @staticmethod
+ def _QueryFields(args):
+ (what, fields) = args
+ req = objects.QueryFieldsRequest(what=what, fields=fields)
+
+ try:
+ fielddefs = query.ALL_FIELDS[req.what]
+ except KeyError:
+ raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
+ errors.ECODE_INVAL)
+
+ return query.QueryFields(fielddefs, req.fields)
+
+ @staticmethod
+ def _QueryJobs(args, queue):
+ (job_ids, fields) = args
+ if isinstance(job_ids, (tuple, list)) and job_ids:
+ msg = utils.CommaJoin(job_ids)
+ else:
+ msg = str(job_ids)
+ logging.info("Received job query request for %s", msg)
+ return queue.OldStyleQueryJobs(job_ids, fields)
+
+ def _QueryConfigValues(self, args):
+ (fields, ) = args
+ logging.info("Received config values query request for %s", fields)
+ op = opcodes.OpClusterConfigQuery(output_fields=fields)
+ return self._Query(op)
+
+ def _QueryClusterInfo(self):
+ logging.info("Received cluster info query request")
+ op = opcodes.OpClusterQuery()
+ return self._Query(op)
+
+ def _QueryTags(self, args):
+ (kind, name) = args
+ logging.info("Received tags query request")
+ op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
+ return self._Query(op)
+
+ @staticmethod
+ def _SetDrainFlag(args, queue):
+ (drain_flag, ) = args
+ logging.info("Received queue drain flag change request to %s",
+ drain_flag)
+ return queue.SetDrainFlag(drain_flag)
+
+ @staticmethod
+ def _SetWatcherPause(args, context):
+ (until, ) = args
+ # FIXME!
+ ec_id = None
+ return _SetWatcherPause(context, ec_id, until)
+
def handle_request(self, method, args): # pylint: disable=R0911
context = self.server.context
queue = context.jobqueue
@@ -279,146 +425,48 @@ class ClientOps(object):
logging.info("Received invalid request '%s'", method)
raise ValueError("Invalid operation '%s'" % method)
- # TODO: Rewrite to not exit in each 'if/elif' branch
-
+ job_id = None
if method == luxi.REQ_SUBMIT_JOB:
- logging.info("Receiving new job")
- (job_def, ) = args
- ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
- job_id = queue.SubmitJob(ops)
- _LogNewJob(True, job_id, ops)
- return job_id
-
+ job_id = self._SubmitJob(args, queue)
elif method == luxi.REQ_PICKUP_JOB:
- logging.info("Picking up new job from queue")
- (job_id, ) = args
- queue.PickupJob(job_id)
-
+ job_id = self._PickupJob(args, queue)
elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
- logging.info("Forcefully receiving new job")
- (job_def, ) = args
- ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
- job_id = queue.SubmitJobToDrainedQueue(ops)
- _LogNewJob(True, job_id, ops)
- return job_id
-
+ job_id = self._SubmitJobToDrainedQueue(args, queue)
elif method == luxi.REQ_SUBMIT_MANY_JOBS:
- logging.info("Receiving multiple jobs")
- (job_defs, ) = args
- jobs = []
- for ops in job_defs:
- jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
- job_ids = queue.SubmitManyJobs(jobs)
- for ((status, job_id), ops) in zip(job_ids, jobs):
- _LogNewJob(status, job_id, ops)
- return job_ids
-
+ job_id = self._SubmitManyJobs(args, queue)
elif method == luxi.REQ_CANCEL_JOB:
- (job_id, ) = args
- logging.info("Received job cancel request for %s", job_id)
- return queue.CancelJob(job_id)
-
+ job_id = self._CancelJob(args, queue)
elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
- (job_id, priority) = args
- logging.info("Received request to change priority for job %s to %s",
- job_id, priority)
- return queue.ChangeJobPriority(job_id, priority)
-
+ job_id = self._ChangeJobPriority(args, queue)
elif method == luxi.REQ_ARCHIVE_JOB:
- (job_id, ) = args
- logging.info("Received job archive request for %s", job_id)
- return queue.ArchiveJob(job_id)
-
+ job_id = self._ArchiveJob(args, queue)
elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
- (age, timeout) = args
- logging.info("Received job autoarchive request for age %s, timeout %s",
- age, timeout)
- return queue.AutoArchiveJobs(age, timeout)
-
+ job_id = self._AutoArchiveJobs(args, queue)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
- (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
- logging.info("Received job poll request for %s", job_id)
- return queue.WaitForJobChanges(job_id, fields, prev_job_info,
- prev_log_serial, timeout)
-
+ job_id = self._WaitForJobChange(args, queue)
elif method == luxi.REQ_QUERY:
- (what, fields, qfilter) = args
-
- if what in constants.QR_VIA_OP:
- result = self._Query(opcodes.OpQuery(what=what, fields=fields,
- qfilter=qfilter))
- elif what == constants.QR_LOCK:
- if qfilter is not None:
- raise errors.OpPrereqError("Lock queries can't be filtered",
- errors.ECODE_INVAL)
- return context.glm.QueryLocks(fields)
- elif what == constants.QR_JOB:
- return queue.QueryJobs(fields, qfilter)
- elif what in constants.QR_VIA_LUXI:
- luxi_client = runtime.GetClient()
- result = luxi_client.Query(what, fields, qfilter).ToDict()
- else:
- raise errors.OpPrereqError("Resource type '%s' unknown" % what,
- errors.ECODE_INVAL)
-
- return result
-
+ job_id = self._PerformQuery(args, queue)
elif method == luxi.REQ_QUERY_FIELDS:
- (what, fields) = args
- req = objects.QueryFieldsRequest(what=what, fields=fields)
-
- try:
- fielddefs = query.ALL_FIELDS[req.what]
- except KeyError:
- raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
- errors.ECODE_INVAL)
-
- return query.QueryFields(fielddefs, req.fields)
-
+ job_id = self._QueryFields(args)
elif method == luxi.REQ_QUERY_JOBS:
- (job_ids, fields) = args
- if isinstance(job_ids, (tuple, list)) and job_ids:
- msg = utils.CommaJoin(job_ids)
- else:
- msg = str(job_ids)
- logging.info("Received job query request for %s", msg)
- return queue.OldStyleQueryJobs(job_ids, fields)
-
+ job_id = self._QueryJobs(args, queue)
elif method == luxi.REQ_QUERY_CONFIG_VALUES:
- (fields, ) = args
- logging.info("Received config values query request for %s", fields)
- op = opcodes.OpClusterConfigQuery(output_fields=fields)
- return self._Query(op)
-
+ job_id = self._QueryConfigValues(args)
elif method == luxi.REQ_QUERY_CLUSTER_INFO:
- logging.info("Received cluster info query request")
- op = opcodes.OpClusterQuery()
- return self._Query(op)
-
+ job_id = self._QueryClusterInfo()
elif method == luxi.REQ_QUERY_TAGS:
- (kind, name) = args
- logging.info("Received tags query request")
- op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
- return self._Query(op)
-
+ job_id = self._QueryTags(args)
elif method == luxi.REQ_SET_DRAIN_FLAG:
- (drain_flag, ) = args
- logging.info("Received queue drain flag change request to %s",
- drain_flag)
- return queue.SetDrainFlag(drain_flag)
-
+ job_id = self._SetDrainFlag(args, queue)
elif method == luxi.REQ_SET_WATCHER_PAUSE:
- (until, ) = args
-
- # FIXME!
- ec_id = None
- return _SetWatcherPause(context, ec_id, until)
-
+ job_id = self._SetWatcherPause(args, context)
else:
logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
" but not implemented" % method)
+ return job_id
+
def _Query(self, op):
"""Runs the specified opcode and returns the result.
--
1.9.1.423.g4596e3a