This is the counterpart of the AsyncStreamServer can be used to handle connected sockets returned from connected clients if the protocol is a terminator separated message stream. Nothing in this class is server specific though: it can be used as a client as well, if the client is an asyncore daemon.
Signed-off-by: Guido Trotter <[email protected]> --- lib/daemon.py | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 63 insertions(+), 0 deletions(-) diff --git a/lib/daemon.py b/lib/daemon.py index 1a0a1e2..f462e43 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -23,6 +23,7 @@ import asyncore +import asynchat import os import signal import logging @@ -158,6 +159,68 @@ class AsyncStreamServer(GanetiBaseAsyncoreDispatcher): return str(address) +class AsyncTerminatedMessageStream(asynchat.async_chat): + """A terminator separated message stream asyncore module. + + Handles a stream connection receiving messages terminated by a defined + separator. For each complete message handle_message is called. + + """ + def __init__(self, connected_socket, peer_address, terminator): + """AsyncTerminatedMessageStream constructor. + + @type connected_socket: socket.socket + @param connected_socket: connected stream socket to receive messages from + @param peer_address: family-specific peer address + @type terminator: string + @param terminator: terminator separating messages in the stream + + """ + # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by + # using a positional argument rather than a keyword one. + asynchat.async_chat.__init__(self, connected_socket) + self.connected_socket = connected_socket + self.family = self.connected_socket.family + self.peer_address = peer_address + self.terminator = terminator + self.set_terminator(terminator) + self.ibuffer = [] + + # this method is overriding an asynchat.async_chat method + def collect_incoming_data(self, data): + self.ibuffer.append(data) + + # this method is overriding an asynchat.async_chat method + def found_terminator(self): + message = "".join(self.ibuffer) + self.ibuffer = [] + self.handle_message(message) + + def handle_message(self, message): + """Handle a terminated message. + + """ + raise NotImplementedError + + def close_log(self): + logging.info("Closing connection from %s", + AsyncStreamServer.format_address(self.family, + self.peer_address)) + self.close() + + # this method is overriding an asyncore.dispatcher method + def handle_expt(self): + self.close_log() + + # this method is overriding an asyncore.dispatcher method + def handle_error(self): + """Log an error in handling any request, and proceed. + + """ + logging.exception("Error while handling asyncore request") + self.close_log() + + class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): """An improved asyncore udp socket. -- 1.7.1
