Not much changes with this patch. The main loop for the IOServer is repaced by mainloop.Run() and the main thread now uses asyncore to handle connections to the master socket. Once it accepts them, though, it just pushes them to the current infrastructure, and everything proceeds as before.
Signed-off-by: Guido Trotter <[email protected]> --- daemons/ganeti-masterd | 70 ++++++++++++++++------------------------------- 1 files changed, 24 insertions(+), 46 deletions(-) diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 24fa827..7dcc0b4 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -30,10 +30,10 @@ inheritance from parent classes requires it. # C0103: Invalid name ganeti-masterd import sys +import socket import SocketServer import time import collections -import signal import logging from optparse import OptionParser @@ -65,69 +65,46 @@ class ClientRequestWorker(workerpool.BaseWorker): def RunTask(self, server, request, client_address): """Process the request. - This is copied from the code in ThreadingMixIn. - """ try: - server.finish_request(request, client_address) - server.close_request(request) - except: # pylint: disable-msg=W0702 - server.handle_error(request, client_address) - server.close_request(request) + server.request_handler_class(request, client_address, server) + finally: + request.close() -class IOServer(SocketServer.UnixStreamServer): - """IO thread class. +class MasterServer(daemon.AsyncStreamServer): + """Master Server. - This class takes care of initializing the other threads, setting - signal handlers (which are processed only in this thread), and doing - cleanup at shutdown. + This is the main asynchronous master server. It handles connections to the + master socket. """ - def __init__(self, address, rqhandler): - """IOServer constructor + def __init__(self, mainloop, address, handler_class): + """MasterServer constructor - @param address: the address to bind this IOServer to - @param rqhandler: RequestHandler type object + @type mainloop: ganeti.daemon.Mainloop + @param mainloop: Mainloop used to poll for I/O events + @param address: the unix socket address to bind the MasterServer to + @param handler_class: handler class for the connections """ - SocketServer.UnixStreamServer.__init__(self, address, rqhandler) + daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address) + self.request_handler_class = handler_class + self.mainloop = mainloop # We'll only start threads once we've forked. self.context = None self.request_workers = None + def handle_connection(self, connected_socket, client_address): + self.request_workers.AddTask(self, connected_socket, client_address) + def setup_queue(self): self.context = GanetiContext() self.request_workers = workerpool.WorkerPool("ClientReq", CLIENT_REQUEST_WORKERS, ClientRequestWorker) - def process_request(self, request, client_address): - """Add task to workerpool to process request. - - """ - (pid, uid, gid) = utils.GetSocketCredentials(request) - logging.info("Accepted connection from pid=%s, uid=%s, gid=%s", - pid, uid, gid) - - self.request_workers.AddTask(self, request, client_address) - - def handle_error(self, request, client_address): - logging.exception("Error while handling request") - - @utils.SignalHandled([signal.SIGINT, signal.SIGTERM]) - def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221 - """Handle one request at a time until told to quit.""" - assert isinstance(signal_handlers, dict) and \ - len(signal_handlers) > 0, \ - "Broken SignalHandled decorator" - # Since we use SignalHandled only once, the resulting dict will map all - # signals to the same handler. We'll just use the first one. - sighandler = signal_handlers.values()[0] - while not sighandler.called: - self.handle_request() - def server_cleanup(self): """Cleanup the server. @@ -136,7 +113,7 @@ class IOServer(SocketServer.UnixStreamServer): """ try: - self.server_close() + self.close() finally: if self.request_workers: self.request_workers.TerminateWorkers() @@ -528,7 +505,8 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613 # concurrent execution. utils.RemoveFile(constants.MASTER_SOCKET) - master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) + mainloop = daemon.Mainloop() + master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler) try: rpc.Init() try: @@ -541,7 +519,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613 master.setup_queue() try: - master.serve_forever() + mainloop.Run() finally: master.server_cleanup() finally: -- 1.7.1
