Each luxi connection now creates an asyncore MasterClientHandler (which
is an AsyncTerminatedMessageStream subclass, sending each message to a
client worker). This makes it harder to DOS the master daemon by just
creating luxi connections, as each of them will use memory and file
descriptors, but not a dedicated thread.

More work on wait-for-job-changes is needed before the
CLIENT_REQUEST_WORKERS size can be reduced.

Signed-off-by: Guido Trotter <[email protected]>
---
 daemons/ganeti-masterd |  102 ++++++++++++++++++++---------------------------
 1 files changed, 43 insertions(+), 59 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index 7dcc0b4..b46994c 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -31,9 +31,7 @@ inheritance from parent classes requires it.
 
 import sys
 import socket
-import SocketServer
 import time
-import collections
 import logging
 
 from optparse import OptionParser
@@ -62,14 +60,50 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 class ClientRequestWorker(workerpool.BaseWorker):
    # pylint: disable-msg=W0221
-  def RunTask(self, server, request, client_address):
+  def RunTask(self, server, message, client):
     """Process the request.
 
     """
+    client_ops = ClientOps(server)
+
     try:
-      server.request_handler_class(request, client_address, server)
-    finally:
-      request.close()
+      (method, args) = luxi.ParseRequest(message)
+    except luxi.ProtocolError, err:
+      logging.error("Protocol Error: %s", err)
+      client.close_log()
+      return
+
+    success = False
+    try:
+      result = client_ops.handle_request(method, args)
+      success = True
+    except errors.GenericError, err:
+      logging.exception("Unexpected exception")
+      success = False
+      result = errors.EncodeException(err)
+    except:
+      logging.exception("Unexpected exception")
+      err = sys.exc_info()
+      result = "Caught exception: %s" % str(err[1])
+
+    # FIXME: send data via push :)
+    client.connected_socket.sendall(luxi.FormatResponse(success, result)
+                                    + constants.LUXI_EOM)
+    #client.push(serializer.DumpJson(response) + constants.LUXI_EOM)
+
+
+class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
+  """Handler for master peers.
+
+  """
+  def __init__(self, server, connected_socket, client_address):
+    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
+                                                 client_address,
+                                                 constants.LUXI_EOM)
+    self.server = server
+
+  def handle_message(self, message):
+    self.server.request_workers.AddTask(self.server, message, self)
 
 
 class MasterServer(daemon.AsyncStreamServer):
@@ -79,17 +113,15 @@ class MasterServer(daemon.AsyncStreamServer):
   master socket.
 
   """
-  def __init__(self, mainloop, address, handler_class):
+  def __init__(self, mainloop, address):
     """MasterServer constructor
 
     @type mainloop: ganeti.daemon.Mainloop
     @param mainloop: Mainloop used to poll for I/O events
     @param address: the unix socket address to bind the MasterServer to
-    @param handler_class: handler class for the connections
 
     """
     daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address)
-    self.request_handler_class = handler_class
     self.mainloop = mainloop
 
     # We'll only start threads once we've forked.
@@ -97,7 +129,7 @@ class MasterServer(daemon.AsyncStreamServer):
     self.request_workers = None
 
   def handle_connection(self, connected_socket, client_address):
-    self.request_workers.AddTask(self, connected_socket, client_address)
+    MasterClientHandler(self, connected_socket, self.family, client_address)
 
   def setup_queue(self):
     self.context = GanetiContext()
@@ -121,54 +153,6 @@ class MasterServer(daemon.AsyncStreamServer):
         self.context.jobqueue.Shutdown()
 
 
-class ClientRqHandler(SocketServer.BaseRequestHandler):
-  """Client handler"""
-  READ_SIZE = 4096
-
-  def setup(self):
-    # pylint: disable-msg=W0201
-    # setup() is the api for initialising for this class
-    self._buffer = ""
-    self._msgs = collections.deque()
-    self._ops = ClientOps(self.server)
-
-  def handle(self):
-    while True:
-      msg = self.read_message()
-      if msg is None:
-        logging.debug("client closed connection")
-        break
-
-      (method, args) = luxi.ParseRequest(msg)
-
-      success = False
-      try:
-        result = self._ops.handle_request(method, args)
-        success = True
-      except errors.GenericError, err:
-        logging.exception("Unexpected exception")
-        result = errors.EncodeException(err)
-      except:
-        logging.exception("Unexpected exception")
-        result = "Caught exception: %s" % str(sys.exc_info()[1])
-
-      self.send_message(luxi.FormatResponse(success, result))
-
-  def read_message(self):
-    while not self._msgs:
-      data = self.request.recv(self.READ_SIZE)
-      if not data:
-        return None
-      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
-      self._buffer = new_msgs.pop()
-      self._msgs.extend(new_msgs)
-    return self._msgs.popleft()
-
-  def send_message(self, msg):
-    # TODO: sendall is not guaranteed to send everything
-    self.request.sendall(msg + constants.LUXI_EOM)
-
-
 class ClientOps:
   """Class holding high-level client operations."""
   def __init__(self, server):
@@ -506,7 +490,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613
   utils.RemoveFile(constants.MASTER_SOCKET)
 
   mainloop = daemon.Mainloop()
-  master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler)
+  master = MasterServer(mainloop, constants.MASTER_SOCKET)
   try:
     rpc.Init()
     try:
-- 
1.7.1

Reply via email to