As the actual queue management has now moved to wconfd, the python function for queue management became obsolete.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/jqueue/__init__.py | 107 ------------------------------------------------- lib/server/masterd.py | 27 ------------- 2 files changed, 134 deletions(-) diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index ceb3e96..4d04079 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -1498,67 +1498,6 @@ class JobQueue(object): """ return rpc.JobQueueRunner(self.context, address_list) - @locking.ssynchronized(_LOCK) - def AddNode(self, node): - """Register a new node with the queue. - - @type node: L{objects.Node} - @param node: the node object to be added - - """ - node_name = node.name - assert node_name != self._my_hostname - - # Clean queue directory on added node - result = self._GetRpc(None).call_jobqueue_purge(node_name) - msg = result.fail_msg - if msg: - logging.warning("Cannot cleanup queue directory on node %s: %s", - node_name, msg) - - if not node.master_candidate: - # remove if existing, ignoring errors - self._nodes.pop(node_name, None) - # and skip the replication of the job ids - return - - # Upload the whole queue excluding archived jobs - files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] - - # Upload current serial file - files.append(pathutils.JOB_QUEUE_SERIAL_FILE) - - # Static address list - addrs = [node.primary_ip] - - for file_name in files: - # Read file content - content = utils.ReadFile(file_name) - - result = _CallJqUpdate(self._GetRpc(addrs), [node_name], - file_name, content) - msg = result[node_name].fail_msg - if msg: - logging.error("Failed to upload file %s to node %s: %s", - file_name, node_name, msg) - - msg = result[node_name].fail_msg - if msg: - logging.error("Failed to set queue drained flag on node %s: %s", - node_name, msg) - - self._nodes[node_name] = node.primary_ip - - @locking.ssynchronized(_LOCK) - def RemoveNode(self, node_name): - """Callback called when removing nodes from the cluster. - - @type node_name: str - @param node_name: the name of the node to remove - - """ - self._nodes.pop(node_name, None) - @staticmethod def _CheckRpcResult(result, nodes, failmsg): """Verifies the status of an RPC call. @@ -1851,14 +1790,6 @@ class JobQueue(object): return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs) @staticmethod - def _FormatSubmitError(msg, ops): - """Formats errors which occurred while submitting a job. - - """ - return ("%s; opcodes %s" % - (msg, utils.CommaJoin(op.Summary() for op in ops))) - - @staticmethod def _ResolveJobDependencies(resolve_fn, deps): """Resolves relative job IDs in dependencies. @@ -2049,44 +1980,6 @@ class JobQueue(object): return (success, msg) - def _ArchiveJobsUnlocked(self, jobs): - """Archives jobs. - - @type jobs: list of L{_QueuedJob} - @param jobs: Job objects - @rtype: int - @return: Number of archived jobs - - """ - archive_jobs = [] - rename_files = [] - for job in jobs: - assert job.writable, "Can't archive read-only job" - assert not job.archived, "Can't cancel archived job" - - if job.CalcStatus() not in constants.JOBS_FINALIZED: - logging.debug("Job %s is not yet done", job.id) - continue - - archive_jobs.append(job) - - old = self._GetJobPath(job.id) - new = self._GetArchivedJobPath(job.id) - rename_files.append((old, new)) - - # TODO: What if 1..n files fail to rename? - self._RenameFilesUnlocked(rename_files) - - logging.debug("Successfully archived job(s) %s", - utils.CommaJoin(job.id for job in archive_jobs)) - - # Since we haven't quite checked, above, if we succeeded or failed renaming - # the files, we update the cached queue size from the filesystem. When we - # get around to fix the TODO: above, we can use the number of actually - # archived jobs to fix this. - self._UpdateQueueSizeUnlocked() - return len(archive_jobs) - @locking.ssynchronized(_LOCK) def PrepareShutdown(self): """Prepare to stop the job queue. diff --git a/lib/server/masterd.py b/lib/server/masterd.py index bc39353..4e31236 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -103,30 +103,3 @@ class GanetiContext(object): # method could be a function, but keep interface backwards compatible def GetRpc(self, cfg): return rpc.RpcRunner(cfg, lambda _: None) - - def AddNode(self, cfg, node, ec_id): - """Adds a node to the configuration. - - """ - # Add it to the configuration - cfg.AddNode(node, ec_id) - - # If preseeding fails it'll not be added - self.jobqueue.AddNode(node) - - def ReaddNode(self, node): - """Updates a node that's already in the configuration - - """ - # Synchronize the queue again - self.jobqueue.AddNode(node) - - def RemoveNode(self, cfg, node): - """Removes a node from the configuration and lock manager. - - """ - # Remove node from configuration - cfg.RemoveNode(node.uuid) - - # Notify job queue - self.jobqueue.RemoveNode(node.name) -- 2.2.0.rc0.207.ga3a616c
