Not much changes with this patch. The main loop for the IOServer is
repaced by mainloop.Run() and the main thread now uses asyncore to
handle connections to the master socket. Once it accepts them, though,
it just pushes them to the current infrastructure, and everything
proceeds as before.

Signed-off-by: Guido Trotter <[email protected]>
---
 daemons/ganeti-masterd |   70 ++++++++++++++++-------------------------------
 1 files changed, 24 insertions(+), 46 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index 24fa827..7dcc0b4 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -30,10 +30,10 @@ inheritance from parent classes requires it.
 # C0103: Invalid name ganeti-masterd
 
 import sys
+import socket
 import SocketServer
 import time
 import collections
-import signal
 import logging
 
 from optparse import OptionParser
@@ -65,69 +65,46 @@ class ClientRequestWorker(workerpool.BaseWorker):
   def RunTask(self, server, request, client_address):
     """Process the request.
 
-    This is copied from the code in ThreadingMixIn.
-
     """
     try:
-      server.finish_request(request, client_address)
-      server.close_request(request)
-    except: # pylint: disable-msg=W0702
-      server.handle_error(request, client_address)
-      server.close_request(request)
+      server.request_handler_class(request, client_address, server)
+    finally:
+      request.close()
 
 
-class IOServer(SocketServer.UnixStreamServer):
-  """IO thread class.
+class MasterServer(daemon.AsyncStreamServer):
+  """Master Server.
 
-  This class takes care of initializing the other threads, setting
-  signal handlers (which are processed only in this thread), and doing
-  cleanup at shutdown.
+  This is the main asynchronous master server. It handles connections to the
+  master socket.
 
   """
-  def __init__(self, address, rqhandler):
-    """IOServer constructor
+  def __init__(self, mainloop, address, handler_class):
+    """MasterServer constructor
 
-    @param address: the address to bind this IOServer to
-    @param rqhandler: RequestHandler type object
+    @type mainloop: ganeti.daemon.Mainloop
+    @param mainloop: Mainloop used to poll for I/O events
+    @param address: the unix socket address to bind the MasterServer to
+    @param handler_class: handler class for the connections
 
     """
-    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
+    daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address)
+    self.request_handler_class = handler_class
+    self.mainloop = mainloop
 
     # We'll only start threads once we've forked.
     self.context = None
     self.request_workers = None
 
+  def handle_connection(self, connected_socket, client_address):
+    self.request_workers.AddTask(self, connected_socket, client_address)
+
   def setup_queue(self):
     self.context = GanetiContext()
     self.request_workers = workerpool.WorkerPool("ClientReq",
                                                  CLIENT_REQUEST_WORKERS,
                                                  ClientRequestWorker)
 
-  def process_request(self, request, client_address):
-    """Add task to workerpool to process request.
-
-    """
-    (pid, uid, gid) = utils.GetSocketCredentials(request)
-    logging.info("Accepted connection from pid=%s, uid=%s, gid=%s",
-                 pid, uid, gid)
-
-    self.request_workers.AddTask(self, request, client_address)
-
-  def handle_error(self, request, client_address):
-    logging.exception("Error while handling request")
-
-  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
-  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
-    """Handle one request at a time until told to quit."""
-    assert isinstance(signal_handlers, dict) and \
-           len(signal_handlers) > 0, \
-           "Broken SignalHandled decorator"
-    # Since we use SignalHandled only once, the resulting dict will map all
-    # signals to the same handler. We'll just use the first one.
-    sighandler = signal_handlers.values()[0]
-    while not sighandler.called:
-      self.handle_request()
-
   def server_cleanup(self):
     """Cleanup the server.
 
@@ -136,7 +113,7 @@ class IOServer(SocketServer.UnixStreamServer):
 
     """
     try:
-      self.server_close()
+      self.close()
     finally:
       if self.request_workers:
         self.request_workers.TerminateWorkers()
@@ -528,7 +505,8 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613
   # concurrent execution.
   utils.RemoveFile(constants.MASTER_SOCKET)
 
-  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
+  mainloop = daemon.Mainloop()
+  master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler)
   try:
     rpc.Init()
     try:
@@ -541,7 +519,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613
 
       master.setup_queue()
       try:
-        master.serve_forever()
+        mainloop.Run()
       finally:
         master.server_cleanup()
     finally:
-- 
1.7.1

Reply via email to