Modified: qpid/proton/branches/examples/tutorial/proton_events.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1631772&r1=1631771&r2=1631772&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/proton_events.py (original) +++ qpid/proton/branches/examples/tutorial/proton_events.py Tue Oct 14 14:35:39 2014 @@ -18,44 +18,9 @@ # import heapq, os, Queue, re, socket, time, types from proton import Collector, Connection, Delivery, Endpoint, Event, Timeout -from proton import Message, ProtonException, Transport, TransportException, ConnectionException +from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException from select import select -class EventDispatcher(object): - - methods = { - Event.CONNECTION_INIT: "on_connection_init", - Event.CONNECTION_OPEN: "on_connection_local_open", - Event.CONNECTION_REMOTE_OPEN: "on_connection_open", - Event.CONNECTION_CLOSE: "on_connection_local_close", - Event.CONNECTION_REMOTE_CLOSE: "on_connection_close", - Event.CONNECTION_FINAL: "on_connection_final", - - Event.SESSION_INIT: "on_session_init", - Event.SESSION_OPEN: "on_session_open", - Event.SESSION_REMOTE_OPEN: "on_session_open", - Event.SESSION_CLOSE: "on_session_local_close", - Event.SESSION_REMOTE_CLOSE: "on_session_close", - Event.SESSION_FINAL: "on_session_final", - - Event.LINK_INIT: "on_link_init", - Event.LINK_OPEN: "on_link_local_open", - Event.LINK_REMOTE_OPEN: "on_link_open", - Event.LINK_CLOSE: "on_link_local_close", - Event.LINK_REMOTE_CLOSE: "on_link_close", - Event.LINK_FLOW: "on_link_flow", - Event.LINK_FINAL: "on_link_final", - - Event.TRANSPORT: "on_transport", - Event.DELIVERY: "on_delivery" - } - - def dispatch(self, event): - getattr(self, self.methods.get(event.type, "on_%s" % str(event.type)), self.unhandled)(event) - - def unhandled(self, event): - pass - class AmqpConnection(object): def __init__(self, conn, sock, events): @@ -270,7 +235,7 @@ class Events(object): def dispatch(self, event): for d in self.dispatchers: - d.dispatch(event) + event.dispatch(d) @property def next_interval(self): @@ -280,9 +245,14 @@ class Events(object): def empty(self): return self.collector.peek() == None +class ExtendedEventType(object): + def __init__(self, name): + self.name = name + self.method = "on_%s" % name + class ApplicationEvent(Event): - def __init__(self, type, connection=None, session=None, link=None, delivery=None, subject=None): - self.type = type + def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): + self.type = ExtendedEventType(typename) self.subject = subject if delivery: self.context = delivery @@ -302,7 +272,7 @@ class ApplicationEvent(Event): def __repr__(self): objects = [self.context, self.subject] - return "%s(%s)" % (self.type, + return "%s(%s)" % (self.type.name, ", ".join([str(o) for o in objects if o is not None])) class ScheduledEvents(Events): @@ -399,41 +369,41 @@ class SelectLoop(object): return False -class Handshaker(EventDispatcher): +class Handshaker(Handler): - def on_connection_open(self, event): + def on_connection_remote_open(self, event): conn = event.connection if conn.state & Endpoint.LOCAL_UNINIT: conn.open() - def on_session_open(self, event): + def on_session_remote_open(self, event): ssn = event.session if ssn.state & Endpoint.LOCAL_UNINIT: ssn.open() - def on_link_open(self, event): + def on_link_remote_open(self, event): link = event.link if link.state & Endpoint.LOCAL_UNINIT: link.source.copy(link.remote_source) link.target.copy(link.remote_target) link.open() - def on_connection_close(self, event): + def on_connection_remote_close(self, event): conn = event.connection if not (conn.state & Endpoint.LOCAL_CLOSED): conn.close() - def on_session_close(self, event): + def on_session_remote_close(self, event): ssn = event.session if not (ssn.state & Endpoint.LOCAL_CLOSED): ssn.close() - def on_link_close(self, event): + def on_link_remote_close(self, event): link = event.link if not (link.state & Endpoint.LOCAL_CLOSED): link.close() -class FlowController(EventDispatcher): +class FlowController(Handler): def __init__(self, window=1): self.window = window @@ -446,7 +416,7 @@ class FlowController(EventDispatcher): if event.link.is_receiver: self.top_up(event.link) - def on_link_open(self, event): + def on_link_remote_open(self, event): if event.link.is_receiver: self.top_up(event.link) @@ -458,7 +428,7 @@ class FlowController(EventDispatcher): if not event.delivery.released and event.delivery.link.is_receiver: self.top_up(event.delivery.link) -class ScopedDispatcher(EventDispatcher): +class ScopedHandler(Handler): scopes = { "pn_connection": ["connection"], @@ -467,59 +437,17 @@ class ScopedDispatcher(EventDispatcher): "pn_delivery": ["delivery", "link", "session", "connection"] } - def dispatch(self, event): + def on_unhandled(self, event): if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]: return - method = self.methods.get(event.type, "on_%s" % str(event.type)) objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])] targets = [getattr(o, "context") for o in objects if hasattr(o, "context")] - handlers = [getattr(t, method) for t in targets if hasattr(t, method)] + handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)] for h in handlers: h(event) -class ErrorHandler(EventDispatcher): - def was_closed_by_peer(self, endpoint, parent=None): - if parent: - return self.was_closed_by_peer(parent) and self.was_closed_by_peer(endpoint) - else: - return endpoint.state & Endpoint.LOCAL_ACTIVE and endpoint.state & Endpoint.REMOTE_CLOSED - - def treat_as_error(self, endpoint, parent=None): - return endpoint.remote_condition or self.was_closed_by_peer(endpoint, parent) - - def print_error(self, endpoint, endpoint_type): - if endpoint.remote_condition: - print endpoint.remote_condition.description - elif self.was_closed_by_peer(endpoint): - print "%s closed by peer" % endpoint_type - - def on_link_close(self, event): - if self.treat_as_error(event.link, event.connection): - self.on_link_error(event) +class OutgoingMessageHandler(Handler): - def on_session_close(self, event): - if self.treat_as_error(event.session, event.connection): - self.on_session_error(event) - - def on_connection_close(self, event): - if self.treat_as_error(event.connection): - self.on_connection_error(event) - - def on_connection_error(self, event): - self.print_error(event.connection, "connection") - event.connection.close() - - def on_session_error(self, event): - self.print_error(event.session, "session") - event.session.close() - event.connection.close() - - def on_link_error(self, event): - self.print_error(event.link, "link") - event.link.close() - event.connection.close() - -class OutgoingMessageHandler(EventDispatcher): def on_link_flow(self, event): if event.link.is_sender and event.link.credit: self.on_credit(event) @@ -561,7 +489,7 @@ class Reject(ProtonException): """ pass -class IncomingMessageHandler(EventDispatcher): +class IncomingMessageHandler(Handler): def on_delivery(self, event): dlv = event.delivery if dlv.released or not dlv.link.is_receiver: return @@ -599,9 +527,110 @@ class IncomingMessageHandler(EventDispat def on_settled(self, event): pass def auto_accept(self): return True -class BaseHandler(ErrorHandler, IncomingMessageHandler, OutgoingMessageHandler): +class ClientEndpointHandler(Handler): + + def is_local_open(self, endpoint): + return endpoint.state & Endpoint.LOCAL_ACTIVE + + def is_remote_open(self, endpoint): + return endpoint.state & Endpoint.REMOTE_ACTIVE + + def is_remote_closed(self, endpoint): + return endpoint.state & Endpoint.REMOTE_CLOSED + + def was_closed_by_peer(self, endpoint, parent=None): + if parent: + return self.was_closed_by_peer(parent) and self.was_closed_by_peer(endpoint) + else: + return self.is_local_open(endpoint) and self.is_remote_closed(endpoint) + + def treat_as_error(self, endpoint, parent=None): + return endpoint.remote_condition or self.was_closed_by_peer(endpoint, parent) + + def print_error(self, endpoint, endpoint_type): + if endpoint.remote_condition: + print endpoint.remote_condition.description + elif self.was_closed_by_peer(endpoint): + print "%s closed by peer" % endpoint_type + + def on_link_remote_close(self, event): + if self.treat_as_error(event.link, event.connection): + self.on_link_error(event) + else: + self.on_link_closed(event) + + def on_session_remote_close(self, event): + if self.treat_as_error(event.session, event.connection): + self.on_session_error(event) + else: + self.on_session_closed(event) + + def on_connection_remote_close(self, event): + if self.treat_as_error(event.connection): + self.on_connection_error(event) + else: + self.on_connection_closed(event) + + def on_connection_error(self, event): + self.print_error(event.connection, "connection") + event.connection.close() + + def on_session_error(self, event): + self.print_error(event.session, "session") + event.session.close() + event.connection.close() + + def on_link_error(self, event): + self.print_error(event.link, "link") + event.link.close() + event.connection.close() + + def on_connection_local_open(self, event): + if self.is_remote_open(event.connection): + self.on_connection_opened(event) + + def on_connection_remote_open(self, event): + if self.is_local_open(event.connection): + self.on_connection_opened(event) + + def on_session_local_open(self, event): + if self.is_remote_open(event.session): + self.on_session_opened(event) + + def on_session_remote_open(self, event): + if self.is_local_open(event.session): + self.on_session_opened(event) + + def on_link_local_open(self, event): + if self.is_remote_open(event.link): + self.on_link_opened(event) + + def on_link_remote_open(self, event): + if self.is_local_open(event.link): + self.on_link_opened(event) + + def on_connection_opened(self, event): + pass + + def on_session_opened(self, event): + pass + + def on_link_opened(self, event): + pass + + def on_connection_closed(self, event): + pass + + def on_session_closed(self, event): + pass + + def on_link_closed(self, event): + pass + +class ClientHandler(ClientEndpointHandler, IncomingMessageHandler, OutgoingMessageHandler): + def __init__(self): - super(BaseHandler, self).__init__() + super(ClientHandler, self).__init__() def on_delivery(self, event): IncomingMessageHandler.on_delivery(self, event) @@ -689,11 +718,11 @@ class MessagingContext(object): ssn.open() return ssn - def on_session_close(self, event): + def on_session_remote_close(self, event): if self.conn: self.conn.close() -class Connector(EventDispatcher): +class Connector(Handler): def attach_to(self, loop): self.loop = loop @@ -706,7 +735,7 @@ class Connector(EventDispatcher): if hasattr(event.connection, "address"): self._connect(event.connection) - def on_connection_open(self, event): + def on_connection_remote_open(self, event): if hasattr(event.connection, "reconnect"): event.connection.reconnect.reset() @@ -816,9 +845,10 @@ class Urls(object): class EventLoop(object): def __init__(self, *handlers): self.connector = Connector() - l = [ScopedDispatcher(), self.connector] - if handlers: l += handlers - else: l.append(FlowController(10)) + if handlers: + l = handlers + (self.connector, ScopedHandler()) + else: + l = [FlowController(10), self.connector, ScopedHandler()] self.events = ScheduledEvents(*l) self.loop = SelectLoop(self.events) self.connector.attach_to(self) @@ -904,10 +934,10 @@ class BlockingReceiver(BlockingLink): super(BlockingReceiver, self).__init__(connection, receiver) if credit: receiver.flow(credit) -class BlockingConnection(EventDispatcher): +class BlockingConnection(Handler): def __init__(self, url, timeout=None): self.timeout = timeout - self.events = Events(ScopedDispatcher()) + self.events = Events(ScopedHandler()) self.loop = SelectLoop(self.events) self.context = MessagingContext(self.loop.events.connection(), handler=self) if isinstance(url, basestring): @@ -951,11 +981,11 @@ class BlockingConnection(EventDispatcher if msg: txt += ": " + msg raise Timeout(txt) - def on_link_close(self, event): + def on_link_remote_close(self, event): if event.link.state & Endpoint.LOCAL_ACTIVE: self.closed(event.link.remote_condition) - def on_connection_close(self, event): + def on_connection_remote_close(self, event): if event.connection.state & Endpoint.LOCAL_ACTIVE: self.closed(event.connection.remote_condition)
Modified: qpid/proton/branches/examples/tutorial/proton_tornado.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_tornado.py?rev=1631772&r1=1631771&r2=1631772&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/proton_tornado.py (original) +++ qpid/proton/branches/examples/tutorial/proton_tornado.py Tue Oct 14 14:35:39 2014 @@ -18,7 +18,7 @@ # under the License. # -from proton_events import ApplicationEvent, Connector, EventLoop, Events, FlowController, MessagingContext, ScopedDispatcher, Url +from proton_events import ApplicationEvent, EventLoop import tornado.ioloop class TornadoLoop(EventLoop): Modified: qpid/proton/branches/examples/tutorial/recurring_timer.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer.py?rev=1631772&r1=1631771&r2=1631772&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/recurring_timer.py (original) +++ qpid/proton/branches/examples/tutorial/recurring_timer.py Tue Oct 14 14:35:39 2014 @@ -19,9 +19,9 @@ # import time -from proton_events import EventLoop, EventDispatcher +from proton_events import EventLoop, Handler -class Recurring(EventDispatcher): +class Recurring(Handler): def __init__(self, period): self.eventloop = EventLoop(self) self.period = period Modified: qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py?rev=1631772&r1=1631771&r2=1631772&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py (original) +++ qpid/proton/branches/examples/tutorial/recurring_timer_tornado.py Tue Oct 14 14:35:39 2014 @@ -19,10 +19,10 @@ # import time -from proton_events import EventDispatcher +from proton_events import Handler from proton_tornado import TornadoLoop -class Recurring(EventDispatcher): +class Recurring(Handler): def __init__(self, period): self.eventloop = TornadoLoop(self) self.period = period Modified: qpid/proton/branches/examples/tutorial/server.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server.py?rev=1631772&r1=1631771&r2=1631772&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/server.py (original) +++ qpid/proton/branches/examples/tutorial/server.py Tue Oct 14 14:35:39 2014 @@ -19,9 +19,9 @@ # from proton import Message -from proton_events import EventLoop, IncomingMessageHandler +from proton_events import EventLoop, ClientHandler -class Server(IncomingMessageHandler): +class Server(ClientHandler): def __init__(self, eventloop, host, address): self.eventloop = eventloop self.conn = eventloop.connect(host, handler=self) @@ -38,14 +38,10 @@ class Server(IncomingMessageHandler): self.senders[event.message.reply_to] = sender sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper())) - def on_connection_open(self, event): + def on_connection_opened(self, event): if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: self.relay = self.conn.create_sender(None) - def on_connection_close(self, endpoint, error): - if error: print "Closed due to %s" % error - self.conn.close() - def run(self): self.eventloop.run() Modified: qpid/proton/branches/examples/tutorial/simple_recv.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_recv.py?rev=1631772&r1=1631771&r2=1631772&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/simple_recv.py (original) +++ qpid/proton/branches/examples/tutorial/simple_recv.py Tue Oct 14 14:35:39 2014 @@ -20,7 +20,7 @@ import proton_events -class Recv(proton_events.BaseHandler): +class Recv(proton_events.ClientHandler): def on_message(self, event): print event.message.body Modified: qpid/proton/branches/examples/tutorial/simple_send.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_send.py?rev=1631772&r1=1631771&r2=1631772&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/simple_send.py (original) +++ qpid/proton/branches/examples/tutorial/simple_send.py Tue Oct 14 14:35:39 2014 @@ -21,7 +21,7 @@ from proton import Message import proton_events -class Send(proton_events.BaseHandler): +class Send(proton_events.ClientHandler): def __init__(self, messages): self.sent = 0 self.confirmed = 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
