Each luxi connection now creates an asyncore MasterClientHandler (which is an AsyncTerminatedMessageStream subclass, sending each message to a client worker). This makes it harder to DOS the master daemon by just creating luxi connections, as each of them will use memory and file descriptors, but not a dedicated thread.
More work on wait-for-job-changes is needed before the CLIENT_REQUEST_WORKERS size can be reduced. Signed-off-by: Guido Trotter <[email protected]> --- daemons/ganeti-masterd | 102 ++++++++++++++++++++--------------------------- 1 files changed, 43 insertions(+), 59 deletions(-) diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 7dcc0b4..b46994c 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -31,9 +31,7 @@ inheritance from parent classes requires it. import sys import socket -import SocketServer import time -import collections import logging from optparse import OptionParser @@ -62,14 +60,50 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR class ClientRequestWorker(workerpool.BaseWorker): # pylint: disable-msg=W0221 - def RunTask(self, server, request, client_address): + def RunTask(self, server, message, client): """Process the request. """ + client_ops = ClientOps(server) + try: - server.request_handler_class(request, client_address, server) - finally: - request.close() + (method, args) = luxi.ParseRequest(message) + except luxi.ProtocolError, err: + logging.error("Protocol Error: %s", err) + client.close_log() + return + + success = False + try: + result = client_ops.handle_request(method, args) + success = True + except errors.GenericError, err: + logging.exception("Unexpected exception") + success = False + result = errors.EncodeException(err) + except: + logging.exception("Unexpected exception") + err = sys.exc_info() + result = "Caught exception: %s" % str(err[1]) + + # FIXME: send data via push :) + client.connected_socket.sendall(luxi.FormatResponse(success, result) + + constants.LUXI_EOM) + #client.push(serializer.DumpJson(response) + constants.LUXI_EOM) + + +class MasterClientHandler(daemon.AsyncTerminatedMessageStream): + """Handler for master peers. + + """ + def __init__(self, server, connected_socket, client_address): + daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, + client_address, + constants.LUXI_EOM) + self.server = server + + def handle_message(self, message): + self.server.request_workers.AddTask(self.server, message, self) class MasterServer(daemon.AsyncStreamServer): @@ -79,17 +113,15 @@ class MasterServer(daemon.AsyncStreamServer): master socket. """ - def __init__(self, mainloop, address, handler_class): + def __init__(self, mainloop, address): """MasterServer constructor @type mainloop: ganeti.daemon.Mainloop @param mainloop: Mainloop used to poll for I/O events @param address: the unix socket address to bind the MasterServer to - @param handler_class: handler class for the connections """ daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address) - self.request_handler_class = handler_class self.mainloop = mainloop # We'll only start threads once we've forked. @@ -97,7 +129,7 @@ class MasterServer(daemon.AsyncStreamServer): self.request_workers = None def handle_connection(self, connected_socket, client_address): - self.request_workers.AddTask(self, connected_socket, client_address) + MasterClientHandler(self, connected_socket, self.family, client_address) def setup_queue(self): self.context = GanetiContext() @@ -121,54 +153,6 @@ class MasterServer(daemon.AsyncStreamServer): self.context.jobqueue.Shutdown() -class ClientRqHandler(SocketServer.BaseRequestHandler): - """Client handler""" - READ_SIZE = 4096 - - def setup(self): - # pylint: disable-msg=W0201 - # setup() is the api for initialising for this class - self._buffer = "" - self._msgs = collections.deque() - self._ops = ClientOps(self.server) - - def handle(self): - while True: - msg = self.read_message() - if msg is None: - logging.debug("client closed connection") - break - - (method, args) = luxi.ParseRequest(msg) - - success = False - try: - result = self._ops.handle_request(method, args) - success = True - except errors.GenericError, err: - logging.exception("Unexpected exception") - result = errors.EncodeException(err) - except: - logging.exception("Unexpected exception") - result = "Caught exception: %s" % str(sys.exc_info()[1]) - - self.send_message(luxi.FormatResponse(success, result)) - - def read_message(self): - while not self._msgs: - data = self.request.recv(self.READ_SIZE) - if not data: - return None - new_msgs = (self._buffer + data).split(constants.LUXI_EOM) - self._buffer = new_msgs.pop() - self._msgs.extend(new_msgs) - return self._msgs.popleft() - - def send_message(self, msg): - # TODO: sendall is not guaranteed to send everything - self.request.sendall(msg + constants.LUXI_EOM) - - class ClientOps: """Class holding high-level client operations.""" def __init__(self, server): @@ -506,7 +490,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613 utils.RemoveFile(constants.MASTER_SOCKET) mainloop = daemon.Mainloop() - master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler) + master = MasterServer(mainloop, constants.MASTER_SOCKET) try: rpc.Init() try: -- 1.7.1
