Repository: qpid-proton Updated Branches: refs/heads/examples 9c87b3db5 -> d9c50b57d
Updated event injection and timer Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9c50b57 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9c50b57 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9c50b57 Branch: refs/heads/examples Commit: d9c50b57dd5aa05d5ae44a88621fdd62bc222cc9 Parents: 9c87b3d Author: Gordon Sim <[email protected]> Authored: Thu Dec 4 21:29:00 2014 +0000 Committer: Gordon Sim <[email protected]> Committed: Thu Dec 4 21:29:00 2014 +0000 ---------------------------------------------------------------------- examples/engine/py/db_send.py | 14 +- proton-c/bindings/python/proton/__init__.py | 16 +- proton-c/bindings/python/proton/handlers.py | 11 +- proton-c/bindings/python/proton/reactors.py | 184 +++++++++++++---------- 4 files changed, 134 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9c50b57/examples/engine/py/db_send.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py index bc8d6da..ce3ce79 100755 --- a/examples/engine/py/db_send.py +++ b/examples/engine/py/db_send.py @@ -31,6 +31,7 @@ class Send(MessagingHandler): self.url = url self.delay = 0 self.sent = 0 + self.load_count = 0 self.records = Queue.Queue(maxsize=50) def on_start(self, event): @@ -39,16 +40,19 @@ class Send(MessagingHandler): self.sender = self.container.create_sender(self.url) def on_records_loaded(self, event): - if self.records.empty() and event.subject == self.sent: - print "Exhausted available data, waiting to recheck..." - # check for new data after 5 seconds - self.container.schedule(time.time() + 5, link=self.sender, subject="data") + if self.records.empty(): + if event.subject == self.load_count: + print "Exhausted available data, waiting to recheck..." + # check for new data after 5 seconds + self.container.schedule(time.time() + 5, link=self.sender, subject="data") else: self.send() def request_records(self): if not self.records.full(): - self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.sent)) + print "loading records..." + self.load_count += 1 + self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.load_count)) def on_credit(self, event): self.send() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9c50b57/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index e375723..8d19fa3 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -3405,7 +3405,7 @@ class Collector: def __del__(self): pn_collector_free(self._impl) -class EventType: +class EventType(object): TYPES = {} @@ -3485,6 +3485,8 @@ class Event(object): return self.context.connection elif self.clazz == "pn_delivery" and not self.context.released: return self.context.link.connection + elif hasattr(self.context, 'connection'): + return self.context.connection else: return None @@ -3496,6 +3498,8 @@ class Event(object): return self.context.session elif self.clazz == "pn_delivery" and not self.context.released: return self.context.link.session + elif hasattr(self.context, 'session'): + return self.context.session else: return None @@ -3505,6 +3509,8 @@ class Event(object): return self.context elif self.clazz == "pn_delivery" and not self.context.released: return self.context.link + elif hasattr(self.context, 'link'): + return self.context.link else: return None @@ -3528,9 +3534,17 @@ class Event(object): def delivery(self): if self.clazz == "pn_delivery": return self.context + elif hasattr(self.context, 'delivery'): + return self.context.delivery else: return None + def __getattr__(self, name): + if hasattr(self.context, name): + return getattr(self.context, name) + else: + raise AttributeError + def __repr__(self): return "%s(%s)" % (self.type, self.context) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9c50b57/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py index 7837d64..71ab837 100644 --- a/proton-c/bindings/python/proton/handlers.py +++ b/proton-c/bindings/python/proton/handlers.py @@ -72,23 +72,20 @@ class ScopedHandler(Handler): or connection scoped handlers that will only be called with events for the object to which they are scoped. """ - scopes = { - "pn_connection": ["connection"], - "pn_session": ["session", "connection"], - "pn_link": ["link", "session", "connection"], - "pn_delivery": ["delivery", "link", "session", "connection"] - } + scopes = ["delivery", "link", "session", "connection"] def on_unhandled(self, method, args): event = args[0] if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]: return - objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])] + + objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)] targets = [getattr(o, "context") for o in objects if hasattr(o, "context")] handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)] for h in handlers: h(event) + class OutgoingMessageHandler(Handler): """ A utility for simpler and more intuitive handling of delivery http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9c50b57/proton-c/bindings/python/proton/reactors.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py index 16a87e4..52e64b6 100644 --- a/proton-c/bindings/python/proton/reactors.py +++ b/proton-c/bindings/python/proton/reactors.py @@ -16,10 +16,12 @@ # specific language governing permissions and limitations # under the License. # -import heapq, os, Queue, socket, time, types -from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url -from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Session, Terminus, Timeout -from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException +import os, Queue, socket, time, types +from heapq import heappush, heappop, nsmallest +from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch +from proton import Endpoint, Event, EventType, generate_uuid, Handler, Link, Message +from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol +from proton import Terminus, Timeout, Transport, TransportException, ulong, Url from select import select from proton.handlers import nested_handlers, ScopedHandler @@ -152,7 +154,7 @@ class AmqpSocket(object): def tick(self): t = self.transport.tick(time.time()) - if t: return t - time.time() + if t: return t else: return None class AmqpAcceptor: @@ -207,8 +209,8 @@ class EventInjector(object): external thread but handled on the event thread associated with the loop. """ - def __init__(self, events): - self.events = events + def __init__(self, collector): + self.collector = collector self.queue = Queue.Queue() self.pipe = os.pipe() self._closed = False @@ -235,14 +237,64 @@ class EventInjector(object): def readable(self): os.read(self.pipe[0], 512) while not self.queue.empty(): - self.events.dispatch(self.queue.get()) + event = self.queue.get() + self.collector.put(event.context, event.type) def removed(self): pass def tick(self): return None +class PQueue: + + def __init__(self): + self.entries = [] + + def add(self, priority, task): + heappush(self.entries, (priority, task)) + + def peek(self): + if self.entries: + return nsmallest(1, self.entries)[0] + else: + return None + + def pop(self): + if self.entries: + return heappop(self.entries) + else: + return None + + def __nonzero__(self): + if self.entries: + return True + else: + return False + +class Timer: + def __init__(self, collector): + self.collector = collector + self.events = PQueue() + + def schedule(self, deadline, event): + self.events.add(deadline, event) + + def tick(self): + while self.events: + deadline, event = self.events.peek() + if time.time() > deadline: + self.events.pop() + self.collector.put(event.context, event.type) + else: + return deadline + return None + + @property + def pending(self): + return bool(self.events) + class Events(object): def __init__(self, *handlers): self.collector = Collector() + self.timer = Timer(self.collector) self.handlers = handlers def connection(self): @@ -251,34 +303,61 @@ class Events(object): return conn def process(self): + result = False while True: ev = self.collector.peek() if ev: self.dispatch(ev) self.collector.pop() + result = True else: - return + return result def dispatch(self, event): for h in self.handlers: event.dispatch(h) @property - def next_interval(self): - return None - - @property def empty(self): - return self.collector.peek() == None + return self.collector.peek() == None and not self.timer.pending -class ExtendedEventType(object): +class Names(object): + def __init__(self, base=10000): + self.names = [] + self.base = base + + def number(self, name): + if name not in self.names: + self.names.append(name) + return self.names.index(name) + self.base + +class ExtendedEventType(EventType): + USED = Names() """ Event type identifier for events defined outside the proton-c library """ - def __init__(self, name): + def __init__(self, name, number=None): + super(ExtendedEventType, self).__init__(number or ExtendedEventType.USED.number(name), "on_%s" % name) self.name = name - self.method = "on_%s" % name + +class ApplicationEventContext(object): + def __init__(self, connection=None, session=None, link=None, delivery=None, subject=None): + self.connection = connection + self.session = session + self.link = link + self.delivery = delivery + if self.delivery: + self.link = self.delivery.link + if self.link: + self.session = self.link.session + if self.session: + self.connection = self.session.connection + self.subject = subject + + def __repr__(self): + objects = [self.connection, self.session, self.link, self.delivery, self.subject] + return ", ".join([str(o) for o in objects if o is not None]) class ApplicationEvent(Event): """ @@ -286,63 +365,13 @@ class ApplicationEvent(Event): an engine object and or an arbitrary subject """ def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): - self.type = ExtendedEventType(typename) - self.subject = subject - if delivery: - self.context = delivery - self.clazz = "pn_delivery" - elif link: - self.context = link - self.clazz = "pn_link" - elif session: - self.context = session - self.clazz = "pn_session" - elif connection: - self.context = connection - self.clazz = "pn_connection" - else: - self.context = None - self.clazz = "none" - - def __repr__(self): - objects = [self.context, self.subject] - return "%s(%s)" % (self.type.name, - ", ".join([str(o) for o in objects if o is not None])) + super(ApplicationEvent, self).__init__(PN_PYREF, ApplicationEventContext(connection, session, link, delivery, subject), ExtendedEventType(typename)) class StartEvent(ApplicationEvent): def __init__(self, container): super(StartEvent, self).__init__("start") self.container = container -class ScheduledEvents(Events): - """ - Support for timed events - """ - def __init__(self, *handlers): - super(ScheduledEvents, self).__init__(*handlers) - self._events = [] - - def schedule(self, deadline, event): - heapq.heappush(self._events, (deadline, event)) - - def process(self): - super(ScheduledEvents, self).process() - while self._events and self._events[0][0] <= time.time(): - self.dispatch(heapq.heappop(self._events)[1]) - - @property - def next_interval(self): - if len(self._events): - deadline = self._events[0][0] - now = time.time() - return deadline - now if deadline > now else 0 - else: - return None - - @property - def empty(self): - return super(ScheduledEvents, self).empty and len(self._events) == 0 - def _min(a, b): if a and b: return min(a, b) elif a: return a @@ -380,15 +409,16 @@ class SelectLoop(object): def do_work(self, timeout=None): """@return True if some work was done, False if time-out expired""" - self.events.process() - if self._abort: return + tick = self.events.timer.tick() + while self.events.process(): + if self._abort: return + tick = self.events.timer.tick() stable = False while not stable: reading = [] writing = [] closed = [] - tick = None for s in self.selectables: if s.reading(): reading.append(s) if s.writing(): writing.append(s) @@ -403,12 +433,10 @@ class SelectLoop(object): if self.redundant: return + if tick: + timeout = _min(tick - time.time(), timeout) if timeout and timeout < 0: timeout = 0 - if self.events.next_interval and (timeout is None or self.events.next_interval < timeout): - timeout = self.events.next_interval - if tick: - timeout = _min(tick, timeout) if reading or writing or timeout: readable, writable, _ = select(reading, writing, [], timeout) for s in self.selectables: @@ -683,7 +711,7 @@ class Container(object): def __init__(self, *handlers): h = [Connector(self), ScopedHandler()] h.extend(nested_handlers(handlers)) - self.events = ScheduledEvents(*h) + self.events = Events(*h) self.loop = SelectLoop(self.events) self.trigger = None self.container_id = str(generate_uuid()) @@ -776,11 +804,11 @@ class Container(object): return AmqpAcceptor(self.events, self, host, port) def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): - self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject)) + self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject)) def get_event_trigger(self): if not self.trigger or self.trigger.closed(): - self.trigger = EventInjector(self.events) + self.trigger = EventInjector(self.events.collector) self.add(self.trigger) return self.trigger --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
