On May 23 15:21, 'Helga Velroyen' via ganeti-devel wrote:
> ... and by that fixing a TODO as well.
>
> Signed-off-by: Helga Velroyen <[email protected]>
> ---
> lib/server/masterd.py | 284
> +++++++++++++++++++++++++++++---------------------
> 1 file changed, 166 insertions(+), 118 deletions(-)
>
> diff --git a/lib/server/masterd.py b/lib/server/masterd.py
> index adfb914..ffd7deb 100644
> --- a/lib/server/masterd.py
> +++ b/lib/server/masterd.py
> @@ -266,6 +266,152 @@ class ClientOps(object):
> def __init__(self, server):
> self.server = server
>
> + @staticmethod
> + def _SubmitJob(args, queue):
> + logging.info("Receiving new job")
> + (job_def, ) = args
> + ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> + job_id = queue.SubmitJob(ops)
> + _LogNewJob(True, job_id, ops)
> + return job_id
> +
> + @staticmethod
> + def _PickupJob(args, queue):
> + logging.info("Picking up new job from queue")
> + (job_id, ) = args
> + queue.PickupJob(job_id)
> + return job_id
> +
> + @staticmethod
> + def _SubmitJobToDrainedQueue(args, queue):
> + logging.info("Forcefully receiving new job")
> + (job_def, ) = args
> + ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> + job_id = queue.SubmitJobToDrainedQueue(ops)
> + _LogNewJob(True, job_id, ops)
> + return job_id
> +
> + @staticmethod
> + def _SubmitManyJobs(args, queue):
> + logging.info("Receiving multiple jobs")
> + (job_defs, ) = args
> + jobs = []
> + for ops in job_defs:
> + jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
> + job_ids = queue.SubmitManyJobs(jobs)
> + for ((status, job_id), ops) in zip(job_ids, jobs):
> + _LogNewJob(status, job_id, ops)
> + return job_ids
> +
> + @staticmethod
> + def _CancelJob(args, queue):
> + (job_id, ) = args
> + logging.info("Received job cancel request for %s", job_id)
> + return queue.CancelJob(job_id)
> +
> + @staticmethod
> + def _ChangeJobPriority(args, queue):
> + (job_id, priority) = args
> + logging.info("Received request to change priority for job %s to %s",
> + job_id, priority)
> + return queue.ChangeJobPriority(job_id, priority)
> +
> + @staticmethod
> + def _ArchiveJob(args, queue):
> + (job_id, ) = args
> + logging.info("Received job archive request for %s", job_id)
> + return queue.ArchiveJob(job_id)
> +
> + @staticmethod
> + def _AutoArchiveJobs(args, queue):
> + (age, timeout) = args
> + logging.info("Received job autoarchive request for age %s, timeout %s",
> + age, timeout)
> + return queue.AutoArchiveJobs(age, timeout)
> +
> + @staticmethod
> + def _WaitForJobChange(args, queue):
> + (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
> + logging.info("Received job poll request for %s", job_id)
> + return queue.WaitForJobChanges(job_id, fields, prev_job_info,
> + prev_log_serial, timeout)
> +
> + def _PerformQuery(self, args, queue):
> + (what, fields, qfilter) = args
> +
> + if what in constants.QR_VIA_OP:
> + result = self._Query(opcodes.OpQuery(what=what, fields=fields,
> + qfilter=qfilter))
> + elif what == constants.QR_LOCK:
> + if qfilter is not None:
> + raise errors.OpPrereqError("Lock queries can't be filtered",
> + errors.ECODE_INVAL)
> + return self.server.context.glm.QueryLocks(fields)
Instead of return:
result = ...
> + elif what == constants.QR_JOB:
> + return queue.QueryJobs(fields, qfilter)
Instead of return:
result = ...
Rest LGTM. No need to resend.
Thanks,
Jose
> + elif what in constants.QR_VIA_LUXI:
> + luxi_client = runtime.GetClient()
> + result = luxi_client.Query(what, fields, qfilter).ToDict()
> + else:
> + raise errors.OpPrereqError("Resource type '%s' unknown" % what,
> + errors.ECODE_INVAL)
> +
> + return result
> +
> + @staticmethod
> + def _QueryFields(args):
> + (what, fields) = args
> + req = objects.QueryFieldsRequest(what=what, fields=fields)
> +
> + try:
> + fielddefs = query.ALL_FIELDS[req.what]
> + except KeyError:
> + raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
> + errors.ECODE_INVAL)
> +
> + return query.QueryFields(fielddefs, req.fields)
> +
> + @staticmethod
> + def _QueryJobs(args, queue):
> + (job_ids, fields) = args
> + if isinstance(job_ids, (tuple, list)) and job_ids:
> + msg = utils.CommaJoin(job_ids)
> + else:
> + msg = str(job_ids)
> + logging.info("Received job query request for %s", msg)
> + return queue.OldStyleQueryJobs(job_ids, fields)
> +
> + def _QueryConfigValues(self, args):
> + (fields, ) = args
> + logging.info("Received config values query request for %s", fields)
> + op = opcodes.OpClusterConfigQuery(output_fields=fields)
> + return self._Query(op)
> +
> + def _QueryClusterInfo(self):
> + logging.info("Received cluster info query request")
> + op = opcodes.OpClusterQuery()
> + return self._Query(op)
> +
> + def _QueryTags(self, args):
> + (kind, name) = args
> + logging.info("Received tags query request")
> + op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
> + return self._Query(op)
> +
> + @staticmethod
> + def _SetDrainFlag(args, queue):
> + (drain_flag, ) = args
> + logging.info("Received queue drain flag change request to %s",
> + drain_flag)
> + return queue.SetDrainFlag(drain_flag)
> +
> + @staticmethod
> + def _SetWatcherPause(args, context):
> + (until, ) = args
> + # FIXME!
> + ec_id = None
> + return _SetWatcherPause(context, ec_id, until)
> +
> def handle_request(self, method, args): # pylint: disable=R0911
> context = self.server.context
> queue = context.jobqueue
> @@ -279,146 +425,48 @@ class ClientOps(object):
> logging.info("Received invalid request '%s'", method)
> raise ValueError("Invalid operation '%s'" % method)
>
> - # TODO: Rewrite to not exit in each 'if/elif' branch
> -
> + job_id = None
> if method == luxi.REQ_SUBMIT_JOB:
> - logging.info("Receiving new job")
> - (job_def, ) = args
> - ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> - job_id = queue.SubmitJob(ops)
> - _LogNewJob(True, job_id, ops)
> - return job_id
> -
> + job_id = self._SubmitJob(args, queue)
> elif method == luxi.REQ_PICKUP_JOB:
> - logging.info("Picking up new job from queue")
> - (job_id, ) = args
> - queue.PickupJob(job_id)
> -
> + job_id = self._PickupJob(args, queue)
> elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
> - logging.info("Forcefully receiving new job")
> - (job_def, ) = args
> - ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
> - job_id = queue.SubmitJobToDrainedQueue(ops)
> - _LogNewJob(True, job_id, ops)
> - return job_id
> -
> + job_id = self._SubmitJobToDrainedQueue(args, queue)
> elif method == luxi.REQ_SUBMIT_MANY_JOBS:
> - logging.info("Receiving multiple jobs")
> - (job_defs, ) = args
> - jobs = []
> - for ops in job_defs:
> - jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
> - job_ids = queue.SubmitManyJobs(jobs)
> - for ((status, job_id), ops) in zip(job_ids, jobs):
> - _LogNewJob(status, job_id, ops)
> - return job_ids
> -
> + job_id = self._SubmitManyJobs(args, queue)
> elif method == luxi.REQ_CANCEL_JOB:
> - (job_id, ) = args
> - logging.info("Received job cancel request for %s", job_id)
> - return queue.CancelJob(job_id)
> -
> + job_id = self._CancelJob(args, queue)
> elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
> - (job_id, priority) = args
> - logging.info("Received request to change priority for job %s to %s",
> - job_id, priority)
> - return queue.ChangeJobPriority(job_id, priority)
> -
> + job_id = self._ChangeJobPriority(args, queue)
> elif method == luxi.REQ_ARCHIVE_JOB:
> - (job_id, ) = args
> - logging.info("Received job archive request for %s", job_id)
> - return queue.ArchiveJob(job_id)
> -
> + job_id = self._ArchiveJob(args, queue)
> elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
> - (age, timeout) = args
> - logging.info("Received job autoarchive request for age %s, timeout %s",
> - age, timeout)
> - return queue.AutoArchiveJobs(age, timeout)
> -
> + job_id = self._AutoArchiveJobs(args, queue)
> elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
> - (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
> - logging.info("Received job poll request for %s", job_id)
> - return queue.WaitForJobChanges(job_id, fields, prev_job_info,
> - prev_log_serial, timeout)
> -
> + job_id = self._WaitForJobChange(args, queue)
> elif method == luxi.REQ_QUERY:
> - (what, fields, qfilter) = args
> -
> - if what in constants.QR_VIA_OP:
> - result = self._Query(opcodes.OpQuery(what=what, fields=fields,
> - qfilter=qfilter))
> - elif what == constants.QR_LOCK:
> - if qfilter is not None:
> - raise errors.OpPrereqError("Lock queries can't be filtered",
> - errors.ECODE_INVAL)
> - return context.glm.QueryLocks(fields)
> - elif what == constants.QR_JOB:
> - return queue.QueryJobs(fields, qfilter)
> - elif what in constants.QR_VIA_LUXI:
> - luxi_client = runtime.GetClient()
> - result = luxi_client.Query(what, fields, qfilter).ToDict()
> - else:
> - raise errors.OpPrereqError("Resource type '%s' unknown" % what,
> - errors.ECODE_INVAL)
> -
> - return result
> -
> + job_id = self._PerformQuery(args, queue)
> elif method == luxi.REQ_QUERY_FIELDS:
> - (what, fields) = args
> - req = objects.QueryFieldsRequest(what=what, fields=fields)
> -
> - try:
> - fielddefs = query.ALL_FIELDS[req.what]
> - except KeyError:
> - raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
> - errors.ECODE_INVAL)
> -
> - return query.QueryFields(fielddefs, req.fields)
> -
> + job_id = self._QueryFields(args)
> elif method == luxi.REQ_QUERY_JOBS:
> - (job_ids, fields) = args
> - if isinstance(job_ids, (tuple, list)) and job_ids:
> - msg = utils.CommaJoin(job_ids)
> - else:
> - msg = str(job_ids)
> - logging.info("Received job query request for %s", msg)
> - return queue.OldStyleQueryJobs(job_ids, fields)
> -
> + job_id = self._QueryJobs(args, queue)
> elif method == luxi.REQ_QUERY_CONFIG_VALUES:
> - (fields, ) = args
> - logging.info("Received config values query request for %s", fields)
> - op = opcodes.OpClusterConfigQuery(output_fields=fields)
> - return self._Query(op)
> -
> + job_id = self._QueryConfigValues(args)
> elif method == luxi.REQ_QUERY_CLUSTER_INFO:
> - logging.info("Received cluster info query request")
> - op = opcodes.OpClusterQuery()
> - return self._Query(op)
> -
> + job_id = self._QueryClusterInfo()
> elif method == luxi.REQ_QUERY_TAGS:
> - (kind, name) = args
> - logging.info("Received tags query request")
> - op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
> - return self._Query(op)
> -
> + job_id = self._QueryTags(args)
> elif method == luxi.REQ_SET_DRAIN_FLAG:
> - (drain_flag, ) = args
> - logging.info("Received queue drain flag change request to %s",
> - drain_flag)
> - return queue.SetDrainFlag(drain_flag)
> -
> + job_id = self._SetDrainFlag(args, queue)
> elif method == luxi.REQ_SET_WATCHER_PAUSE:
> - (until, ) = args
> -
> - # FIXME!
> - ec_id = None
> - return _SetWatcherPause(context, ec_id, until)
> -
> + job_id = self._SetWatcherPause(args, context)
> else:
> logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
> raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
> " but not implemented" % method)
>
> + return job_id
> +
> def _Query(self, op):
> """Runs the specified opcode and returns the result.
>
> --
> 1.9.1.423.g4596e3a
>
--
Jose Antonio Lopes
Ganeti Engineering
Google Germany GmbH
Dienerstr. 12, 80331, München
Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores
Steuernummer: 48/725/00206
Umsatzsteueridentifikationsnummer: DE813741370