make container a subclass of reactor and remove redundant code which is replaced by reactor
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6136f112 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6136f112 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6136f112 Branch: refs/heads/master Commit: 6136f1125fabeb28b30244b1597ad822b38ed413 Parents: b701147 Author: Gordon Sim <[email protected]> Authored: Wed Feb 11 22:57:28 2015 +0000 Committer: Gordon Sim <[email protected]> Committed: Thu Feb 12 17:11:07 2015 +0000 ---------------------------------------------------------------------- examples/engine/py/client_http.py | 13 +- examples/engine/py/db_send.py | 9 +- examples/engine/py/helloworld.py | 1 - examples/engine/py/helloworld_direct_tornado.py | 17 +- examples/engine/py/helloworld_tornado.py | 9 +- examples/engine/py/proton_tornado.py | 124 ++-- examples/engine/py/recurring_timer.py | 11 +- examples/engine/py/test_examples.py | 9 +- proton-c/bindings/python/proton/handlers.py | 12 + proton-c/bindings/python/proton/reactors.py | 568 ++++--------------- proton-c/bindings/python/proton/utils.py | 12 +- 11 files changed, 250 insertions(+), 535 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/client_http.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py index aa46ed9..cd0d63f 100755 --- a/examples/engine/py/client_http.py +++ b/examples/engine/py/client_http.py @@ -18,11 +18,11 @@ # under the License. # +import tornado.ioloop +import tornado.web from proton import Message from proton.handlers import MessagingHandler -from proton_tornado import TornadoLoop -from tornado.ioloop import IOLoop -import tornado.web +from proton_tornado import Container class Client(MessagingHandler): def __init__(self, host, address): @@ -65,6 +65,7 @@ class Client(MessagingHandler): def request(self, body, handler): self.pending.append((body, handler)) self.do_request() + self.container.touch() class ExampleHandler(tornado.web.RequestHandler): def initialize(self, client): @@ -100,11 +101,13 @@ class ExampleHandler(tornado.web.RequestHandler): '</form>') +loop = tornado.ioloop.IOLoop.instance() client = Client("localhost:5672", "examples") -loop = TornadoLoop(client) +client.container = Container(client, loop=loop) +client.container.initialise() app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(client=client))]) app.listen(8888) try: - loop.run() + loop.start() except KeyboardInterrupt: loop.stop() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/db_send.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py index 1e1ad3f..c2e3fc2 100755 --- a/examples/engine/py/db_send.py +++ b/examples/engine/py/db_send.py @@ -50,7 +50,7 @@ class Send(MessagingHandler): if event.subject == self.load_count: print "Exhausted available data, waiting to recheck..." # check for new data after 5 seconds - self.container.schedule(time.time() + 5, link=self.sender, subject="data") + self.container.schedule(5, self) else: self.send() @@ -86,10 +86,9 @@ class Send(MessagingHandler): self.db.reset() self.sent = self.confirmed - def on_timer(self, event): - if event.subject == "data": - print "Rechecking for data..." - self.request_records() + def on_timer_task(self, event): + print "Rechecking for data..." + self.request_records() parser = optparse.OptionParser(usage="usage: %prog [options]", description="Send messages to the supplied address.") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/helloworld.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py index 4ec53ca..e1ea2ee 100755 --- a/examples/engine/py/helloworld.py +++ b/examples/engine/py/helloworld.py @@ -42,4 +42,3 @@ class HelloWorld(MessagingHandler): event.connection.close() Container(HelloWorld("localhost:5672", "examples")).run() - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/helloworld_direct_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py index e798dae..2466f80 100755 --- a/examples/engine/py/helloworld_direct_tornado.py +++ b/examples/engine/py/helloworld_direct_tornado.py @@ -20,19 +20,16 @@ from proton import Message from proton.handlers import MessagingHandler -from proton_tornado import TornadoLoop +from proton_tornado import Container class HelloWorld(MessagingHandler): - def __init__(self, server, address): + def __init__(self, url): super(HelloWorld, self).__init__() - self.server = server - self.address = address + self.url = url def on_start(self, event): - self.eventloop = event.container - self.acceptor = event.container.listen(self.server) - conn = event.container.connect(self.server) - event.container.create_sender(conn, self.address) + self.acceptor = event.container.listen(self.url) + event.container.create_sender(self.url) def on_sendable(self, event): event.sender.send(Message(body=u"Hello World!")) @@ -46,7 +43,5 @@ class HelloWorld(MessagingHandler): def on_connection_closed(self, event): self.acceptor.close() - self.eventloop.stop() - -TornadoLoop(HelloWorld("localhost:8888", "examples")).run() +Container(HelloWorld("localhost:8888/examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/helloworld_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py index c56ca8d..d4b32cf 100755 --- a/examples/engine/py/helloworld_tornado.py +++ b/examples/engine/py/helloworld_tornado.py @@ -20,7 +20,7 @@ from proton import Message from proton.handlers import MessagingHandler -from proton_tornado import TornadoLoop +from proton_tornado import Container class HelloWorld(MessagingHandler): def __init__(self, server, address): @@ -29,7 +29,6 @@ class HelloWorld(MessagingHandler): self.address = address def on_start(self, event): - self.eventloop = event.container conn = event.container.connect(self.server) event.container.create_receiver(conn, self.address) event.container.create_sender(conn, self.address) @@ -42,8 +41,4 @@ class HelloWorld(MessagingHandler): print event.message.body event.connection.close() - def on_connection_closed(self, event): - self.eventloop.stop() - -TornadoLoop(HelloWorld("localhost:5672", "examples")).run() - +Container(HelloWorld("localhost:5672", "examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/proton_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/proton_tornado.py b/examples/engine/py/proton_tornado.py index cfe7d6f..d4afeba 100755 --- a/examples/engine/py/proton_tornado.py +++ b/examples/engine/py/proton_tornado.py @@ -18,53 +18,97 @@ # under the License. # -from proton.reactors import ApplicationEvent, Container, StartEvent import tornado.ioloop +from proton.reactors import Container as BaseContainer +from proton.handlers import IOHandler -class TornadoLoop(Container): - def __init__(self, *handlers): - super(TornadoLoop, self).__init__(*handlers) - self.loop = tornado.ioloop.IOLoop.current() +class TornadoLoopHandler: - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None): - conn = super(TornadoLoop, self).connect(url, urls, address, handler, reconnect) - self.events.process() - return conn + def __init__(self, loop=None, handler_base=None): + self.loop = loop or tornado.ioloop.IOLoop.instance() + self.io = handler_base or IOHandler() + self.count = 0 - def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): - self.loop.call_at(deadline, self.events.dispatch, ApplicationEvent("timer", connection, session, link, delivery, subject)) + def on_reactor_init(self, event): + self.reactor = event.reactor - def add(self, conn): - self.loop.add_handler(conn, self._connection_ready, tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE) + def on_reactor_quiesced(self, event): + event.reactor.yield_() - def remove(self, conn): - self.loop.remove_handler(conn) + def on_unhandled(self, name, event): + event.dispatch(self.io) - def run(self): - self.events.dispatch(StartEvent(self)) - self.loop.start() + def _events(self, sel): + events = self.loop.ERROR + if sel.reading: + events |= self.loop.READ + if sel.writing: + events |= self.loop.WRITE + return events + + def _schedule(self, sel): + if sel.deadline: + self.loop.add_timeout(sel.deadline, lambda: self.expired(sel)) + + def _expired(self, sel): + sel.expired() + + def _process(self): + self.reactor.process() + if not self.reactor.quiesced: + self.loop.add_callback(self._process) + + def _callback(self, sel, events): + if self.loop.READ & events: + sel.readable() + if self.loop.WRITE & events: + sel.writable() + self._process() + + def on_selectable_init(self, event): + sel = event.context + if sel.fileno() >= 0: + self.loop.add_handler(sel.fileno(), lambda fd, events: self._callback(sel, events), self._events(sel)) + self._schedule(sel) + self.count += 1 + + def on_selectable_updated(self, event): + sel = event.context + if sel.fileno() > 0: + self.loop.update_handler(sel.fileno(), self._events(sel)) + self._schedule(sel) - def stop(self): + def on_selectable_final(self, event): + sel = event.context + if sel.fileno() > 0: + self.loop.remove_handler(sel.fileno()) + sel.release() + self.count -= 1 + if self.count == 0: + self.loop.add_callback(self._stop) + + def _stop(self): + self.reactor.stop() self.loop.stop() - def _get_event_flags(self, conn): - flags = 0 - if conn.reading(): - flags |= tornado.ioloop.IOLoop.READ - # FIXME: need way to update flags to avoid busy loop - #if conn.writing(): - # flags |= tornado.ioloop.IOLoop.WRITE - flags |= tornado.ioloop.IOLoop.WRITE - return flags - - def _connection_ready(self, conn, events): - if events & tornado.ioloop.IOLoop.READ: - conn.readable() - if events & tornado.ioloop.IOLoop.WRITE: - conn.writable() - if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed(): - self.loop.remove_handler(conn) - conn.close() - conn.removed() - self.events.process() - self.loop.update_handler(conn, self._get_event_flags(conn)) +class Container(object): + def __init__(self, *handlers, **kwargs): + self.tornado_loop = kwargs.get('loop', tornado.ioloop.IOLoop.instance()) + kwargs['global_handler'] = TornadoLoopHandler(self.tornado_loop, kwargs.get('handler_base', None)) + self.container = BaseContainer(*handlers, **kwargs) + + def initialise(self): + self.container.start() + self.container.process() + + def run(self): + self.initialise() + self.tornado_loop.start() + + def touch(self): + self._process() + + def _process(self): + self.container.process() + if not self.container.quiesced: + self.tornado_loop.add_callback(self._process) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/recurring_timer.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/recurring_timer.py b/examples/engine/py/recurring_timer.py index de530d3..c8d5acf 100755 --- a/examples/engine/py/recurring_timer.py +++ b/examples/engine/py/recurring_timer.py @@ -18,20 +18,19 @@ # under the License. # -import time from proton.reactors import Container, Handler class Recurring(Handler): def __init__(self, period): self.period = period - def on_start(self, event): - self.container = event.container - self.container.schedule(time.time() + self.period, subject=self) + def on_reactor_init(self, event): + self.container = event.reactor + self.container.schedule(self.period, self) - def on_timer(self, event): + def on_timer_task(self, event): print "Tick..." - self.container.schedule(time.time() + self.period, subject=self) + self.container.schedule(self.period, self) try: container = Container(Recurring(1.0)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/test_examples.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/test_examples.py b/examples/engine/py/test_examples.py index ed0aabc..b46b85b 100644 --- a/examples/engine/py/test_examples.py +++ b/examples/engine/py/test_examples.py @@ -18,6 +18,7 @@ # import subprocess +import time import unittest class ExamplesTest(unittest.TestCase): @@ -48,8 +49,10 @@ class ExamplesTest(unittest.TestCase): expected = ["{'sequence': %iL}" % (i+1) for i in range(100)] self.assertEqual(actual, expected) - def test_client_server(self, client=['client.py'], server=['server.py']): + def test_client_server(self, client=['client.py'], server=['server.py'], sleep=0): s = subprocess.Popen(server, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) + if sleep: + time.sleep(sleep) c = subprocess.Popen(client, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) c.wait() s.terminate() @@ -71,10 +74,10 @@ class ExamplesTest(unittest.TestCase): self.test_client_server(client=['sync_client.py'], server=['server_tx.py']) def test_client_server_direct(self): - self.test_client_server(client=['client.py', '-a', 'localhost:8888/examples'], server=['server_direct.py']) + self.test_client_server(client=['client.py', '-a', 'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5) def test_sync_client_server_direct(self): - self.test_client_server(client=['sync_client.py', 'localhost:8888/examples'], server=['server_direct.py']) + self.test_client_server(client=['sync_client.py', 'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5) def test_db_send_recv(self): self.maxDiff = None http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py index f735647..56c6c0a 100644 --- a/proton-c/bindings/python/proton/handlers.py +++ b/proton-c/bindings/python/proton/handlers.py @@ -342,6 +342,10 @@ class EndpointStateHandler(Handler): elif self.peer_close_is_error: self.on_link_error(event) + def on_transport_closed(self, event): + if self.delegate: + dispatch(self.delegate, 'on_disconnected', event) + class MessagingHandler(Handler, Acking): """ A general purpose handler that makes the proton-c events somewhat @@ -367,6 +371,14 @@ class MessagingHandler(Handler, Acking): EndpointStateHandler.print_error(event.link, "link") event.connection.close() + def on_reactor_init(self, event): + if hasattr(event.reactor, 'subclass'): + setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) + self.on_start(event) + + def on_start(self, event): pass + def on_disconnected(self, event): pass + class TransactionHandler(object): """ The interface for transaction handlers, i.e. objects that want to http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/proton-c/bindings/python/proton/reactors.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py index 7da3531..8b4c532 100644 --- a/proton-c/bindings/python/proton/reactors.py +++ b/proton-c/bindings/python/proton/reactors.py @@ -179,189 +179,6 @@ from proton import wrappers as _wrappers _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x)) -class AmqpSocket(object): - """ - Associates a transport with a connection and a socket and can be - used in an io loop to track the io for an AMQP 1.0 connection. - """ - - def __init__(self, conn, sock, events, heartbeat=None): - self.events = events - self.conn = conn - self.transport = Transport() - if heartbeat: self.transport.idle_timeout = heartbeat - self.transport.bind(self.conn) - self.socket = sock - self.socket.setblocking(0) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.write_done = False - self.read_done = False - self._closed = False - - def accept(self, force_sasl=True, ssl_domain=None): - if ssl_domain: - self.ssl = SSL(self.transport, ssl_domain) - if force_sasl: - sasl = self.transport.sasl() - sasl.mechanisms("ANONYMOUS") - sasl.server() - sasl.done(SASL.OK) - #TODO: use SASL anyway if requested by peer - return self - - def connect(self, host, port=None, username=None, password=None, force_sasl=True, ssl_domain=None): - if ssl_domain: - self.ssl = SSL(self.transport, ssl_domain) - self.ssl.peer_hostname = host - if username and password: - sasl = self.transport.sasl() - sasl.plain(username, password) - elif force_sasl: - sasl = self.transport.sasl() - sasl.mechanisms('ANONYMOUS') - sasl.client() - try: - self.socket.connect_ex((host, port or 5672)) - except socket.gaierror, e: - raise ConnectionException("Cannot resolve '%s': %s" % (host, e)) - return self - - def _closed_cleanly(self): - return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED - - def closed(self): - if not self._closed and self.write_done and self.read_done: - self.close() - return True - else: - return False - - def close(self): - self.socket.close() - self._closed = True - - def fileno(self): - return self.socket.fileno() - - def reading(self): - if self.read_done: return False - c = self.transport.capacity() - if c > 0: - return True - elif c < 0: - self.read_done = True - return False - - def writing(self): - if self.write_done: return False - try: - p = self.transport.pending() - if p > 0: - return True - elif p < 0: - self.write_done = True - return False - else: # p == 0 - return False - except TransportException, e: - self.write_done = True - return False - - def readable(self): - c = self.transport.capacity() - if c > 0: - try: - data = self.socket.recv(c) - if data: - self.transport.push(data) - else: - if not self._closed_cleanly(): - self.read_done = True - self.write_done = True - else: - self.transport.close_tail() - except TransportException, e: - logging.error("Error on read: %s" % e) - self.read_done = True - except socket.error, e: - logging.error("Error on recv: %s" % e) - self.read_done = True - self.write_done = True - elif c < 0: - self.read_done = True - - def writable(self): - try: - p = self.transport.pending() - if p > 0: - data = self.transport.peek(p) - n = self.socket.send(data) - self.transport.pop(n) - elif p < 0: - self.write_done = True - except TransportException, e: - logging.error("Error on write: %s" % e) - self.write_done = True - except socket.error, e: - logging.error("Error on send: %s" % e) - self.write_done = True - - def removed(self): - if not self._closed_cleanly(): - self.transport.unbind() - self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn)) - - def tick(self): - t = self.transport.tick(time.time()) - if t: return t - else: return None - -class AmqpAcceptor: - """ - Listens for incoming sockets, creates an AmqpSocket for them and - adds that to the list of tracked 'selectables'. The acceptor can - itself be added to an io loop. - """ - - def __init__(self, events, loop, host, port, ssl_domain=None): - self.events = events - self.loop = loop - self.socket = socket.socket() - self.socket.setblocking(0) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind((host, port)) - self.socket.listen(5) - self.ssl_domain = ssl_domain - self.loop.add(self) - self._closed = False - - def closed(self): - if self._closed: - self.socket.close() - return True - else: - return False - - def close(self): - self._closed = True - - def fileno(self): - return self.socket.fileno() - - def reading(self): - return not self._closed - - def writing(self): - return False - - def readable(self): - sock, addr = self.socket.accept() - if sock: - self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept(ssl_domain=self.ssl_domain)) - - def removed(self): pass - def tick(self): return None - class EventInjector(object): """ @@ -369,8 +186,7 @@ class EventInjector(object): external thread but handled on the event thread associated with the loop. """ - def __init__(self, collector): - self.collector = collector + def __init__(self): self.queue = Queue.Queue() self.pipe = os.pipe() self._closed = False @@ -379,9 +195,6 @@ class EventInjector(object): self.queue.put(event) os.write(self.pipe[1], "!") - def closed(self): - return self._closed and self.queue.empty() - def close(self): self._closed = True os.write(self.pipe[1], "!") @@ -389,98 +202,22 @@ class EventInjector(object): def fileno(self): return self.pipe[0] - def reading(self): - return not self.closed() - - def writing(self): - return False + def on_selectable_init(self, event): + sel = event.context + sel.fileno(self.fileno()) + sel.reading = True + event.reactor.update(sel) - def readable(self): + def on_selectable_readable(self, event): os.read(self.pipe[0], 512) while not self.queue.empty(): - event = self.queue.get() - self.collector.put(event.context, event.type) - - def removed(self): pass - def tick(self): return None - -class PQueue: - - def __init__(self): - self.entries = [] - - def add(self, priority, task): - heappush(self.entries, (priority, task)) - - def peek(self): - if self.entries: - return nsmallest(1, self.entries)[0] - else: - return None - - def pop(self): - if self.entries: - return heappop(self.entries) - else: - return None - - def __nonzero__(self): - if self.entries: - return True - else: - return False - -class Timer: - def __init__(self, collector): - self.collector = collector - self.events = PQueue() - - def schedule(self, deadline, event): - self.events.add(deadline, event) - - def tick(self): - while self.events: - deadline, event = self.events.peek() - if time.time() > deadline: - self.events.pop() - self.collector.put(event.context, event.type) - else: - return deadline - return None - - @property - def pending(self): - return bool(self.events) - -class Events(object): - def __init__(self, *handlers): - self.collector = Collector() - self.timer = Timer(self.collector) - self.handlers = handlers - - def connection(self): - conn = Connection() - conn.collect(self.collector) - return conn - - def process(self): - result = False - while True: - ev = self.collector.peek() - if ev: - self.dispatch(ev) - self.collector.pop() - result = True - else: - return result - - def dispatch(self, event): - for h in self.handlers: - event.dispatch(h) + requested = self.queue.get() + event.reactor.push_event(requested.context, requested.type) + if self._closed: + s = event.context + s.terminate() + event.reactor.update(s) - @property - def empty(self): - return self.collector.peek() == None and not self.timer.pending class Names(object): def __init__(self, base=10000): @@ -530,88 +267,6 @@ class StartEvent(ApplicationEvent): super(StartEvent, self).__init__("start") self.container = container -def _min(a, b): - if a and b: return min(a, b) - elif a: return a - else: return b - -class SelectLoop(object): - """ - An io loop based on select() - """ - def __init__(self, events): - self.events = events - self.selectables = [] - self._abort = False - - def abort(self): - self._abort = True - - def add(self, selectable): - self.selectables.append(selectable) - - def remove(self, selectable): - self.selectables.remove(selectable) - - @property - def redundant(self): - return self.events.empty and not self.selectables - - @property - def aborted(self): - return self._abort - - def run(self): - while not (self._abort or self.redundant): - self.do_work() - - def do_work(self, timeout=None): - """@return True if some work was done, False if time-out expired""" - tick = self.events.timer.tick() - - if self.events.process(): - tick = self.events.timer.tick() - while self.events.process(): - if self._abort: return - tick = self.events.timer.tick() - return True # Did work, let caller check their conditions, don't select. - - stable = False - while not stable: - reading = [] - writing = [] - closed = [] - for s in self.selectables: - if s.reading(): reading.append(s) - if s.writing(): writing.append(s) - if s.closed(): closed.append(s) - else: tick = _min(tick, s.tick()) - - for s in closed: - self.selectables.remove(s) - s.removed() - stable = len(closed) == 0 - - if self.redundant: - return False - - if tick: - timeout = _min(tick - time.time(), timeout) - if timeout and timeout < 0: - timeout = 0 - if reading or writing or timeout: - readable, writable, _ = select(reading, writing, [], timeout) - for s in self.selectables: - s.tick() - for s in readable: - s.readable() - for s in writable: - s.writable() - - return bool(readable or writable) - else: - return False - class Transaction(object): """ @@ -626,15 +281,6 @@ class Transaction(object): self.failed = False self._pending = [] self.settle_before_discharge = settle_before_discharge - class InternalTransactionHandler(OutgoingMessageHandler): - def __init__(self): - super(InternalTransactionHandler, self).__init__(auto_settle=True) - - def on_settled(self, event): - if hasattr(event.delivery, "transaction"): - event.transaction = event.delivery.transaction - event.delivery.transaction.handle_outcome(event) - self.internal_handler = InternalTransactionHandler() self.declare() def commit(self): @@ -652,7 +298,6 @@ class Transaction(object): def _send_ctrl(self, descriptor, value): delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) - delivery.context=self.internal_handler delivery.transaction = self return delivery @@ -803,58 +448,83 @@ class SessionPerConnection(object): event.connection.close() self._default_session = None +class GlobalOverrides(object): + """ + Internal handler that triggers the necessary socket connect for an + opened connection. + """ + def __init__(self, base): + self.base = base + + def on_unhandled(self, name, event): + if not self._override(event): + event.dispatch(self.base) + + def _override(self, event): + conn = event.connection + return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides) + class Connector(Handler): """ Internal handler that triggers the necessary socket connect for an opened connection. """ - def __init__(self, loop, ssl_domain=None): - self.loop = loop - self.ssl_domain = ssl_domain - - def _get_ssl_domain(self, connection, scheme): - if hasattr(connection, 'ssl_domain'): - return connection.ssl_domain - elif scheme == 'amqps': - return self.ssl_domain - else: - return None + def __init__(self, connection): + self.connection = connection + self.address = None + self.heartbeat = None + self.reconnect = None + self.ssl_domain = None def _connect(self, connection): - url = connection.address.next() - logging.info("connecting to %s:%i" % (url.host, url.port)) - heartbeat = None - if hasattr(connection, 'heartbeat'): - heartbeat = connection.heartbeat - s = AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat) - s.connect(url.host, url.port, username=url.username, password=url.password, ssl_domain=self._get_ssl_domain(connection, url.scheme)) - self.loop.add(s) - connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference + url = self.address.next() + # IoHandler uses the hostname to determine where to try to connect to + connection.hostname = "%s:%i" % (url.host, url.port) + logging.info("connecting to %s..." % connection.hostname) + + transport = Transport() + transport.bind(connection) + if self.heartbeat: + transport.idle_timeout = self.heartbeat + if url.scheme == 'amqps' and self.ssl_domain: + self.ssl = SSL(transport, self.ssl_domain) + self.ssl.peer_hostname = url.host + if url.username: + sasl = transport.sasl() + if url.username == 'anonymous': + sasl.mechanisms('ANONYMOUS') + else: + sasl.plain(url.username, url.password) def on_connection_local_open(self, event): - if hasattr(event.connection, "address"): - self._connect(event.connection) + self._connect(event.connection) def on_connection_remote_open(self, event): - if hasattr(event.connection, "reconnect"): - event.connection.reconnect.reset() - - def on_disconnected(self, event): - if hasattr(event.connection, "reconnect"): - event.connection._pin = event.connection #no longer referenced by AmqpSocket, so pin in memory with circular reference - delay = event.connection.reconnect.next() - if delay == 0: - logging.info("Disconnected, reconnecting...") - self._connect(event.connection) + logging.info("connected to %s" % event.connection.hostname) + if self.reconnect: + self.reconnect.reset() + self.transport = None + + def on_transport_closed(self, event): + if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: + if self.reconnect: + event.transport.unbind() + delay = self.reconnect.next() + if delay == 0: + logging.info("Disconnected, reconnecting...") + self._connect(self.connection) + else: + logging.info("Disconnected will try to reconnect after %s seconds" % delay) + event.reactor.schedule(delay, self) else: - logging.info("Disconnected will try to reconnect after %s seconds" % delay) - self.loop.schedule(time.time() + delay, connection=event.connection, subject=self) - else: - logging.info("Disconnected") + logging.info("Disconnected") + self.connection = None - def on_timer(self, event): - if event.subject == self and event.connection: - self._connect(event.connection) + def on_timer_task(self, event): + self._connect(self.connection) + + def on_connection_remote_close(self, event): + self.connection = None class Backoff(object): """ @@ -904,33 +574,33 @@ class SSLConfig(object): self.server.set_trusted_ca_db(certificate_db) -class Container(object): - def __init__(self, *handlers): - self.ssl = SSLConfig() - h = [Connector(self, self.ssl.client), ScopedHandler()] - h.extend(handlers) - self.events = Events(*h) - self.loop = SelectLoop(self.events) - self.trigger = None - self.container_id = str(generate_uuid()) +class Container(Reactor): + def __init__(self, *handlers, **kwargs): + super(Container, self).__init__(*handlers, **kwargs) + if "impl" not in kwargs: + self.ssl = SSLConfig() + self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) + self.trigger = None + self.container_id = str(generate_uuid()) + Wrapper.__setattr__(self, 'subclass', self.__class__) def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None): - conn = self.events.connection() - conn._pin = conn #circular reference until the open event gets handled - if handler: - conn.context = handler + conn = self.connection(handler) conn.container = self.container_id or str(generate_uuid()) - conn.heartbeat = heartbeat - if url: conn.address = Urls([url]) - elif urls: conn.address = Urls(urls) - elif address: conn.address = address + + connector = Connector(conn) + conn._overrides = connector + if url: connector.address = Urls([url]) + elif urls: connector.address = Urls(urls) + elif address: connector.address = address else: raise ValueError("One of url, urls or address required") + if heartbeat: + connector.heartbeat = heartbeat if reconnect: - conn.reconnect = reconnect + connector.reconnect = reconnect elif reconnect is None: - conn.reconnect = Backoff() - if ssl_domain: - conn.ssl_domain = ssl_domain + connector.reconnect = Backoff() + connector.ssl_domain = ssl_domain or self.ssl.client conn._session_policy = SessionPerConnection() #todo: make configurable conn.open() return conn @@ -966,7 +636,7 @@ class Container(object): if target: snd.target.address = target if handler: - snd.context = handler + snd.handler = handler if tags: snd.tag_generator = tags _apply_link_options(options, snd) @@ -987,46 +657,40 @@ class Container(object): if target: rcv.target.address = target if handler: - rcv.context = handler + rcv.handler = handler _apply_link_options(options, rcv) rcv.open() return rcv def declare_transaction(self, context, handler=None, settle_before_discharge=False): if not _get_attr(context, '_txn_ctrl'): - context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl') + class InternalTransactionHandler(OutgoingMessageHandler): + def __init__(self): + super(InternalTransactionHandler, self).__init__(auto_settle=True) + + def on_settled(self, event): + if hasattr(event.delivery, "transaction"): + event.transaction = event.delivery.transaction + event.delivery.transaction.handle_outcome(event) + context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) context._txn_ctrl.target.type = Terminus.COORDINATOR context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) return Transaction(context._txn_ctrl, handler, settle_before_discharge) def listen(self, url, ssl_domain=None): - url = Urls([url]).next() + url = Url(url) ssl_config = ssl_domain if not ssl_config and url.scheme == 'amqps': ssl_config = self.ssl_domain - return AmqpAcceptor(self.events, self, url.host, url.port, ssl_domain=ssl_config) - - def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): - self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject)) + return self.acceptor(url.host, url.port) def get_event_trigger(self): if not self.trigger or self.trigger.closed(): - self.trigger = EventInjector(self.events.collector) - self.add(self.trigger) + self.trigger = EventInjector() + self.selectable(self.trigger) return self.trigger - def add(self, selectable): - self.loop.add(selectable) - - def remove(self, selectable): - self.loop.remove(selectable) - - def run(self): - self.events.dispatch(StartEvent(self)) - self.loop.run() - - def stop(self): - self.loop.abort() - def do_work(self, timeout=None): - return self.loop.do_work(timeout) + if timeout: + self.timeout = timeout + return self.process() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/proton-c/bindings/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py index 6ea5b4b..924f3d9 100644 --- a/proton-c/bindings/python/proton/utils.py +++ b/proton-c/bindings/python/proton/utils.py @@ -19,8 +19,8 @@ import collections, Queue, socket, time, threading from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message from proton import ProtonException, Timeout, Url -from proton.reactors import AmqpSocket, Container, Events, SelectLoop -from proton.handlers import Acking, MessagingHandler, ScopedHandler, IncomingMessageHandler +from proton.reactors import Container +from proton.handlers import MessagingHandler, IncomingMessageHandler def utf8(s): if isinstance(s, unicode): @@ -190,6 +190,7 @@ class BlockingConnection(Handler): def __init__(self, url, timeout=None, container=None, ssl_domain=None): self.timeout = timeout self.container = container or Container() + self.container.start() self.url = Url(utf8(url)).defaults() self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain) self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), @@ -216,7 +217,7 @@ class BlockingConnection(Handler): def run(self): """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ - self.container.run() + while self.container.process(): pass def wait(self, condition, timeout=False, msg=None): """Call do_work until condition() is true""" @@ -224,11 +225,12 @@ class BlockingConnection(Handler): timeout = self.timeout if timeout is None: while not condition(): - self.container.do_work() + self.container.process() else: deadline = time.time() + timeout while not condition(): - if not self.container.do_work(deadline - time.time()): + self.container.process() + if deadline < time.time(): txt = "Connection %s timed out" % self.url if msg: txt += ": " + msg raise Timeout(txt) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
