This is the counterpart of the AsyncStreamServer can be used to handle
connected sockets returned from connected clients if the protocol is a
terminator separated message stream. Nothing in this class is server
specific though: it can be used as a client as well, if the client is an
asyncore daemon.

Signed-off-by: Guido Trotter <[email protected]>
---
 lib/daemon.py |   63 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 63 insertions(+), 0 deletions(-)

diff --git a/lib/daemon.py b/lib/daemon.py
index 1a0a1e2..f462e43 100644
--- a/lib/daemon.py
+++ b/lib/daemon.py
@@ -23,6 +23,7 @@
 
 
 import asyncore
+import asynchat
 import os
 import signal
 import logging
@@ -158,6 +159,68 @@ class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
       return str(address)
 
 
+class AsyncTerminatedMessageStream(asynchat.async_chat):
+  """A terminator separated message stream asyncore module.
+
+  Handles a stream connection receiving messages terminated by a defined
+  separator. For each complete message handle_message is called.
+
+  """
+  def __init__(self, connected_socket, peer_address, terminator):
+    """AsyncTerminatedMessageStream constructor.
+
+    @type connected_socket: socket.socket
+    @param connected_socket: connected stream socket to receive messages from
+    @param peer_address: family-specific peer address
+    @type terminator: string
+    @param terminator: terminator separating messages in the stream
+
+    """
+    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
+    # using a positional argument rather than a keyword one.
+    asynchat.async_chat.__init__(self, connected_socket)
+    self.connected_socket = connected_socket
+    self.family = self.connected_socket.family
+    self.peer_address = peer_address
+    self.terminator = terminator
+    self.set_terminator(terminator)
+    self.ibuffer = []
+
+  # this method is overriding an asynchat.async_chat method
+  def collect_incoming_data(self, data):
+    self.ibuffer.append(data)
+
+  # this method is overriding an asynchat.async_chat method
+  def found_terminator(self):
+    message = "".join(self.ibuffer)
+    self.ibuffer = []
+    self.handle_message(message)
+
+  def handle_message(self, message):
+    """Handle a terminated message.
+
+    """
+    raise NotImplementedError
+
+  def close_log(self):
+    logging.info("Closing connection from %s",
+                   AsyncStreamServer.format_address(self.family,
+                                                    self.peer_address))
+    self.close()
+
+  # this method is overriding an asyncore.dispatcher method
+  def handle_expt(self):
+    self.close_log()
+
+  # this method is overriding an asyncore.dispatcher method
+  def handle_error(self):
+    """Log an error in handling any request, and proceed.
+
+    """
+    logging.exception("Error while handling asyncore request")
+    self.close_log()
+
+
 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
   """An improved asyncore udp socket.
 
-- 
1.7.1

Reply via email to