make container a subclass of reactor and remove redundant code which is 
replaced by reactor


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6136f112
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6136f112
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6136f112

Branch: refs/heads/master
Commit: 6136f1125fabeb28b30244b1597ad822b38ed413
Parents: b701147
Author: Gordon Sim <[email protected]>
Authored: Wed Feb 11 22:57:28 2015 +0000
Committer: Gordon Sim <[email protected]>
Committed: Thu Feb 12 17:11:07 2015 +0000

----------------------------------------------------------------------
 examples/engine/py/client_http.py               |  13 +-
 examples/engine/py/db_send.py                   |   9 +-
 examples/engine/py/helloworld.py                |   1 -
 examples/engine/py/helloworld_direct_tornado.py |  17 +-
 examples/engine/py/helloworld_tornado.py        |   9 +-
 examples/engine/py/proton_tornado.py            | 124 ++--
 examples/engine/py/recurring_timer.py           |  11 +-
 examples/engine/py/test_examples.py             |   9 +-
 proton-c/bindings/python/proton/handlers.py     |  12 +
 proton-c/bindings/python/proton/reactors.py     | 568 ++++---------------
 proton-c/bindings/python/proton/utils.py        |  12 +-
 11 files changed, 250 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/client_http.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client_http.py 
b/examples/engine/py/client_http.py
index aa46ed9..cd0d63f 100755
--- a/examples/engine/py/client_http.py
+++ b/examples/engine/py/client_http.py
@@ -18,11 +18,11 @@
 # under the License.
 #
 
+import tornado.ioloop
+import tornado.web
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton_tornado import TornadoLoop
-from tornado.ioloop import IOLoop
-import tornado.web
+from proton_tornado import Container
 
 class Client(MessagingHandler):
     def __init__(self, host, address):
@@ -65,6 +65,7 @@ class Client(MessagingHandler):
     def request(self, body, handler):
         self.pending.append((body, handler))
         self.do_request()
+        self.container.touch()
 
 class ExampleHandler(tornado.web.RequestHandler):
     def initialize(self, client):
@@ -100,11 +101,13 @@ class ExampleHandler(tornado.web.RequestHandler):
                    '</form>')
 
 
+loop = tornado.ioloop.IOLoop.instance()
 client = Client("localhost:5672", "examples")
-loop = TornadoLoop(client)
+client.container = Container(client, loop=loop)
+client.container.initialise()
 app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, 
dict(client=client))])
 app.listen(8888)
 try:
-    loop.run()
+    loop.start()
 except KeyboardInterrupt:
     loop.stop()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index 1e1ad3f..c2e3fc2 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -50,7 +50,7 @@ class Send(MessagingHandler):
             if event.subject == self.load_count:
                 print "Exhausted available data, waiting to recheck..."
                 # check for new data after 5 seconds
-                self.container.schedule(time.time() + 5, link=self.sender, 
subject="data")
+                self.container.schedule(5, self)
         else:
             self.send()
 
@@ -86,10 +86,9 @@ class Send(MessagingHandler):
         self.db.reset()
         self.sent = self.confirmed
 
-    def on_timer(self, event):
-        if event.subject == "data":
-            print "Rechecking for data..."
-            self.request_records()
+    def on_timer_task(self, event):
+        print "Rechecking for data..."
+        self.request_records()
 
 parser = optparse.OptionParser(usage="usage: %prog [options]",
                                description="Send messages to the supplied 
address.")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/helloworld.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py
index 4ec53ca..e1ea2ee 100755
--- a/examples/engine/py/helloworld.py
+++ b/examples/engine/py/helloworld.py
@@ -42,4 +42,3 @@ class HelloWorld(MessagingHandler):
         event.connection.close()
 
 Container(HelloWorld("localhost:5672", "examples")).run()
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/helloworld_direct_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct_tornado.py 
b/examples/engine/py/helloworld_direct_tornado.py
index e798dae..2466f80 100755
--- a/examples/engine/py/helloworld_direct_tornado.py
+++ b/examples/engine/py/helloworld_direct_tornado.py
@@ -20,19 +20,16 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton_tornado import TornadoLoop
+from proton_tornado import Container
 
 class HelloWorld(MessagingHandler):
-    def __init__(self, server, address):
+    def __init__(self, url):
         super(HelloWorld, self).__init__()
-        self.server = server
-        self.address = address
+        self.url = url
 
     def on_start(self, event):
-        self.eventloop = event.container
-        self.acceptor = event.container.listen(self.server)
-        conn = event.container.connect(self.server)
-        event.container.create_sender(conn, self.address)
+        self.acceptor = event.container.listen(self.url)
+        event.container.create_sender(self.url)
 
     def on_sendable(self, event):
         event.sender.send(Message(body=u"Hello World!"))
@@ -46,7 +43,5 @@ class HelloWorld(MessagingHandler):
 
     def on_connection_closed(self, event):
         self.acceptor.close()
-        self.eventloop.stop()
-
-TornadoLoop(HelloWorld("localhost:8888", "examples")).run()
 
+Container(HelloWorld("localhost:8888/examples")).run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/helloworld_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_tornado.py 
b/examples/engine/py/helloworld_tornado.py
index c56ca8d..d4b32cf 100755
--- a/examples/engine/py/helloworld_tornado.py
+++ b/examples/engine/py/helloworld_tornado.py
@@ -20,7 +20,7 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton_tornado import TornadoLoop
+from proton_tornado import Container
 
 class HelloWorld(MessagingHandler):
     def __init__(self, server, address):
@@ -29,7 +29,6 @@ class HelloWorld(MessagingHandler):
         self.address = address
 
     def on_start(self, event):
-        self.eventloop = event.container
         conn = event.container.connect(self.server)
         event.container.create_receiver(conn, self.address)
         event.container.create_sender(conn, self.address)
@@ -42,8 +41,4 @@ class HelloWorld(MessagingHandler):
         print event.message.body
         event.connection.close()
 
-    def on_connection_closed(self, event):
-        self.eventloop.stop()
-
-TornadoLoop(HelloWorld("localhost:5672", "examples")).run()
-
+Container(HelloWorld("localhost:5672", "examples")).run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/proton_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/proton_tornado.py 
b/examples/engine/py/proton_tornado.py
index cfe7d6f..d4afeba 100755
--- a/examples/engine/py/proton_tornado.py
+++ b/examples/engine/py/proton_tornado.py
@@ -18,53 +18,97 @@
 # under the License.
 #
 
-from proton.reactors import ApplicationEvent, Container, StartEvent
 import tornado.ioloop
+from proton.reactors import Container as BaseContainer
+from proton.handlers import IOHandler
 
-class TornadoLoop(Container):
-    def __init__(self, *handlers):
-        super(TornadoLoop, self).__init__(*handlers)
-        self.loop = tornado.ioloop.IOLoop.current()
+class TornadoLoopHandler:
 
-    def connect(self, url=None, urls=None, address=None, handler=None, 
reconnect=None):
-        conn = super(TornadoLoop, self).connect(url, urls, address, handler, 
reconnect)
-        self.events.process()
-        return conn
+    def __init__(self, loop=None, handler_base=None):
+        self.loop = loop or tornado.ioloop.IOLoop.instance()
+        self.io = handler_base or IOHandler()
+        self.count = 0
 
-    def schedule(self, deadline, connection=None, session=None, link=None, 
delivery=None, subject=None):
-        self.loop.call_at(deadline, self.events.dispatch, 
ApplicationEvent("timer", connection, session, link, delivery, subject))
+    def on_reactor_init(self, event):
+        self.reactor = event.reactor
 
-    def add(self, conn):
-        self.loop.add_handler(conn, self._connection_ready, 
tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE)
+    def on_reactor_quiesced(self, event):
+        event.reactor.yield_()
 
-    def remove(self, conn):
-        self.loop.remove_handler(conn)
+    def on_unhandled(self, name, event):
+        event.dispatch(self.io)
 
-    def run(self):
-        self.events.dispatch(StartEvent(self))
-        self.loop.start()
+    def _events(self, sel):
+        events = self.loop.ERROR
+        if sel.reading:
+            events |= self.loop.READ
+        if sel.writing:
+            events |= self.loop.WRITE
+        return events
+
+    def _schedule(self, sel):
+        if sel.deadline:
+            self.loop.add_timeout(sel.deadline, lambda: self.expired(sel))
+
+    def _expired(self, sel):
+        sel.expired()
+
+    def _process(self):
+        self.reactor.process()
+        if not self.reactor.quiesced:
+            self.loop.add_callback(self._process)
+
+    def _callback(self, sel, events):
+        if self.loop.READ & events:
+            sel.readable()
+        if self.loop.WRITE & events:
+            sel.writable()
+        self._process()
+
+    def on_selectable_init(self, event):
+        sel = event.context
+        if sel.fileno() >= 0:
+            self.loop.add_handler(sel.fileno(), lambda fd, events: 
self._callback(sel, events), self._events(sel))
+        self._schedule(sel)
+        self.count += 1
+
+    def on_selectable_updated(self, event):
+        sel = event.context
+        if sel.fileno() > 0:
+            self.loop.update_handler(sel.fileno(), self._events(sel))
+        self._schedule(sel)
 
-    def stop(self):
+    def on_selectable_final(self, event):
+        sel = event.context
+        if sel.fileno() > 0:
+            self.loop.remove_handler(sel.fileno())
+        sel.release()
+        self.count -= 1
+        if self.count == 0:
+            self.loop.add_callback(self._stop)
+
+    def _stop(self):
+        self.reactor.stop()
         self.loop.stop()
 
-    def _get_event_flags(self, conn):
-        flags = 0
-        if conn.reading():
-            flags |= tornado.ioloop.IOLoop.READ
-        # FIXME: need way to update flags to avoid busy loop
-        #if conn.writing():
-        #    flags |= tornado.ioloop.IOLoop.WRITE
-        flags |= tornado.ioloop.IOLoop.WRITE
-        return flags
-
-    def _connection_ready(self, conn, events):
-        if events & tornado.ioloop.IOLoop.READ:
-            conn.readable()
-        if events & tornado.ioloop.IOLoop.WRITE:
-            conn.writable()
-        if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed():
-            self.loop.remove_handler(conn)
-            conn.close()
-            conn.removed()
-        self.events.process()
-        self.loop.update_handler(conn, self._get_event_flags(conn))
+class Container(object):
+    def __init__(self, *handlers, **kwargs):
+        self.tornado_loop = kwargs.get('loop', 
tornado.ioloop.IOLoop.instance())
+        kwargs['global_handler'] = TornadoLoopHandler(self.tornado_loop, 
kwargs.get('handler_base', None))
+        self.container = BaseContainer(*handlers, **kwargs)
+
+    def initialise(self):
+        self.container.start()
+        self.container.process()
+
+    def run(self):
+        self.initialise()
+        self.tornado_loop.start()
+
+    def touch(self):
+        self._process()
+
+    def _process(self):
+        self.container.process()
+        if not self.container.quiesced:
+            self.tornado_loop.add_callback(self._process)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/recurring_timer.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/recurring_timer.py 
b/examples/engine/py/recurring_timer.py
index de530d3..c8d5acf 100755
--- a/examples/engine/py/recurring_timer.py
+++ b/examples/engine/py/recurring_timer.py
@@ -18,20 +18,19 @@
 # under the License.
 #
 
-import time
 from proton.reactors import Container, Handler
 
 class Recurring(Handler):
     def __init__(self, period):
         self.period = period
 
-    def on_start(self, event):
-        self.container = event.container
-        self.container.schedule(time.time() + self.period, subject=self)
+    def on_reactor_init(self, event):
+        self.container = event.reactor
+        self.container.schedule(self.period, self)
 
-    def on_timer(self, event):
+    def on_timer_task(self, event):
         print "Tick..."
-        self.container.schedule(time.time() + self.period, subject=self)
+        self.container.schedule(self.period, self)
 
 try:
     container = Container(Recurring(1.0))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/examples/engine/py/test_examples.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/test_examples.py 
b/examples/engine/py/test_examples.py
index ed0aabc..b46b85b 100644
--- a/examples/engine/py/test_examples.py
+++ b/examples/engine/py/test_examples.py
@@ -18,6 +18,7 @@
 #
 
 import subprocess
+import time
 import unittest
 
 class ExamplesTest(unittest.TestCase):
@@ -48,8 +49,10 @@ class ExamplesTest(unittest.TestCase):
         expected = ["{'sequence': %iL}" % (i+1) for i in range(100)]
         self.assertEqual(actual, expected)
 
-    def test_client_server(self, client=['client.py'], server=['server.py']):
+    def test_client_server(self, client=['client.py'], server=['server.py'], 
sleep=0):
         s = subprocess.Popen(server, stderr=subprocess.STDOUT, 
stdout=subprocess.PIPE)
+        if sleep:
+            time.sleep(sleep)
         c = subprocess.Popen(client, stderr=subprocess.STDOUT, 
stdout=subprocess.PIPE)
         c.wait()
         s.terminate()
@@ -71,10 +74,10 @@ class ExamplesTest(unittest.TestCase):
         self.test_client_server(client=['sync_client.py'], 
server=['server_tx.py'])
 
     def test_client_server_direct(self):
-        self.test_client_server(client=['client.py', '-a', 
'localhost:8888/examples'], server=['server_direct.py'])
+        self.test_client_server(client=['client.py', '-a', 
'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5)
 
     def test_sync_client_server_direct(self):
-        self.test_client_server(client=['sync_client.py', 
'localhost:8888/examples'], server=['server_direct.py'])
+        self.test_client_server(client=['sync_client.py', 
'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5)
 
     def test_db_send_recv(self):
         self.maxDiff = None

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py 
b/proton-c/bindings/python/proton/handlers.py
index f735647..56c6c0a 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -342,6 +342,10 @@ class EndpointStateHandler(Handler):
         elif self.peer_close_is_error:
             self.on_link_error(event)
 
+    def on_transport_closed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_disconnected', event)
+
 class MessagingHandler(Handler, Acking):
     """
     A general purpose handler that makes the proton-c events somewhat
@@ -367,6 +371,14 @@ class MessagingHandler(Handler, Acking):
         EndpointStateHandler.print_error(event.link, "link")
         event.connection.close()
 
+    def on_reactor_init(self, event):
+        if hasattr(event.reactor, 'subclass'):
+            setattr(event, event.reactor.subclass.__name__.lower(), 
event.reactor)
+        self.on_start(event)
+
+    def on_start(self, event): pass
+    def on_disconnected(self, event): pass
+
 class TransactionHandler(object):
     """
     The interface for transaction handlers, i.e. objects that want to

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py 
b/proton-c/bindings/python/proton/reactors.py
index 7da3531..8b4c532 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -179,189 +179,6 @@ from proton import wrappers as _wrappers
 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
 
-class AmqpSocket(object):
-    """
-    Associates a transport with a connection and a socket and can be
-    used in an io loop to track the io for an AMQP 1.0 connection.
-    """
-
-    def __init__(self, conn, sock, events, heartbeat=None):
-        self.events = events
-        self.conn = conn
-        self.transport = Transport()
-        if heartbeat: self.transport.idle_timeout = heartbeat
-        self.transport.bind(self.conn)
-        self.socket = sock
-        self.socket.setblocking(0)
-        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-        self.write_done = False
-        self.read_done = False
-        self._closed = False
-
-    def accept(self, force_sasl=True, ssl_domain=None):
-        if ssl_domain:
-            self.ssl = SSL(self.transport, ssl_domain)
-        if force_sasl:
-            sasl = self.transport.sasl()
-            sasl.mechanisms("ANONYMOUS")
-            sasl.server()
-            sasl.done(SASL.OK)
-        #TODO: use SASL anyway if requested by peer
-        return self
-
-    def connect(self, host, port=None, username=None, password=None, 
force_sasl=True, ssl_domain=None):
-        if ssl_domain:
-            self.ssl = SSL(self.transport, ssl_domain)
-            self.ssl.peer_hostname = host
-        if username and password:
-            sasl = self.transport.sasl()
-            sasl.plain(username, password)
-        elif force_sasl:
-            sasl = self.transport.sasl()
-            sasl.mechanisms('ANONYMOUS')
-            sasl.client()
-        try:
-            self.socket.connect_ex((host, port or 5672))
-        except socket.gaierror, e:
-            raise ConnectionException("Cannot resolve '%s': %s" % (host, e))
-        return self
-
-    def _closed_cleanly(self):
-        return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & 
Endpoint.REMOTE_CLOSED
-
-    def closed(self):
-        if not self._closed and self.write_done and self.read_done:
-            self.close()
-            return True
-        else:
-            return False
-
-    def close(self):
-        self.socket.close()
-        self._closed = True
-
-    def fileno(self):
-        return self.socket.fileno()
-
-    def reading(self):
-        if self.read_done: return False
-        c = self.transport.capacity()
-        if c > 0:
-            return True
-        elif c < 0:
-            self.read_done = True
-        return False
-
-    def writing(self):
-        if self.write_done: return False
-        try:
-            p = self.transport.pending()
-            if p > 0:
-                return True
-            elif p < 0:
-                self.write_done = True
-                return False
-            else: # p == 0
-                return False
-        except TransportException, e:
-            self.write_done = True
-            return False
-
-    def readable(self):
-        c = self.transport.capacity()
-        if c > 0:
-            try:
-                data = self.socket.recv(c)
-                if data:
-                    self.transport.push(data)
-                else:
-                    if not self._closed_cleanly():
-                        self.read_done = True
-                        self.write_done = True
-                    else:
-                        self.transport.close_tail()
-            except TransportException, e:
-                logging.error("Error on read: %s" % e)
-                self.read_done = True
-            except socket.error, e:
-                logging.error("Error on recv: %s" % e)
-                self.read_done = True
-                self.write_done = True
-        elif c < 0:
-            self.read_done = True
-
-    def writable(self):
-        try:
-            p = self.transport.pending()
-            if p > 0:
-                data = self.transport.peek(p)
-                n = self.socket.send(data)
-                self.transport.pop(n)
-            elif p < 0:
-                self.write_done = True
-        except TransportException, e:
-            logging.error("Error on write: %s" % e)
-            self.write_done = True
-        except socket.error, e:
-            logging.error("Error on send: %s" % e)
-            self.write_done = True
-
-    def removed(self):
-        if not self._closed_cleanly():
-            self.transport.unbind()
-            self.events.dispatch(ApplicationEvent("disconnected", 
connection=self.conn))
-
-    def tick(self):
-        t = self.transport.tick(time.time())
-        if t: return t
-        else: return None
-
-class AmqpAcceptor:
-    """
-    Listens for incoming sockets, creates an AmqpSocket for them and
-    adds that to the list of tracked 'selectables'. The acceptor can
-    itself be added to an io loop.
-    """
-
-    def __init__(self, events, loop, host, port, ssl_domain=None):
-        self.events = events
-        self.loop = loop
-        self.socket = socket.socket()
-        self.socket.setblocking(0)
-        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.socket.bind((host, port))
-        self.socket.listen(5)
-        self.ssl_domain = ssl_domain
-        self.loop.add(self)
-        self._closed = False
-
-    def closed(self):
-        if self._closed:
-            self.socket.close()
-            return True
-        else:
-            return False
-
-    def close(self):
-        self._closed = True
-
-    def fileno(self):
-        return self.socket.fileno()
-
-    def reading(self):
-        return not self._closed
-
-    def writing(self):
-        return False
-
-    def readable(self):
-        sock, addr = self.socket.accept()
-        if sock:
-            self.loop.add(AmqpSocket(self.events.connection(), sock, 
self.events).accept(ssl_domain=self.ssl_domain))
-
-    def removed(self): pass
-    def tick(self): return None
-
 
 class EventInjector(object):
     """
@@ -369,8 +186,7 @@ class EventInjector(object):
     external thread but handled on the event thread associated with
     the loop.
     """
-    def __init__(self, collector):
-        self.collector = collector
+    def __init__(self):
         self.queue = Queue.Queue()
         self.pipe = os.pipe()
         self._closed = False
@@ -379,9 +195,6 @@ class EventInjector(object):
         self.queue.put(event)
         os.write(self.pipe[1], "!")
 
-    def closed(self):
-        return self._closed and self.queue.empty()
-
     def close(self):
         self._closed = True
         os.write(self.pipe[1], "!")
@@ -389,98 +202,22 @@ class EventInjector(object):
     def fileno(self):
         return self.pipe[0]
 
-    def reading(self):
-        return not self.closed()
-
-    def writing(self):
-        return False
+    def on_selectable_init(self, event):
+        sel = event.context
+        sel.fileno(self.fileno())
+        sel.reading = True
+        event.reactor.update(sel)
 
-    def readable(self):
+    def on_selectable_readable(self, event):
         os.read(self.pipe[0], 512)
         while not self.queue.empty():
-            event = self.queue.get()
-            self.collector.put(event.context, event.type)
-
-    def removed(self): pass
-    def tick(self): return None
-
-class PQueue:
-
-    def __init__(self):
-        self.entries = []
-
-    def add(self, priority, task):
-        heappush(self.entries, (priority, task))
-
-    def peek(self):
-        if self.entries:
-            return nsmallest(1, self.entries)[0]
-        else:
-            return None
-
-    def pop(self):
-        if self.entries:
-            return heappop(self.entries)
-        else:
-            return None
-
-    def __nonzero__(self):
-        if self.entries:
-            return True
-        else:
-            return False
-
-class Timer:
-    def __init__(self, collector):
-        self.collector = collector
-        self.events = PQueue()
-
-    def schedule(self, deadline, event):
-        self.events.add(deadline, event)
-
-    def tick(self):
-        while self.events:
-            deadline, event = self.events.peek()
-            if time.time() > deadline:
-                self.events.pop()
-                self.collector.put(event.context, event.type)
-            else:
-                return deadline
-        return None
-
-    @property
-    def pending(self):
-        return bool(self.events)
-
-class Events(object):
-    def __init__(self, *handlers):
-        self.collector = Collector()
-        self.timer = Timer(self.collector)
-        self.handlers = handlers
-
-    def connection(self):
-        conn = Connection()
-        conn.collect(self.collector)
-        return conn
-
-    def process(self):
-        result = False
-        while True:
-            ev = self.collector.peek()
-            if ev:
-                self.dispatch(ev)
-                self.collector.pop()
-                result = True
-            else:
-                return result
-
-    def dispatch(self, event):
-        for h in self.handlers:
-            event.dispatch(h)
+            requested = self.queue.get()
+            event.reactor.push_event(requested.context, requested.type)
+        if self._closed:
+            s = event.context
+            s.terminate()
+            event.reactor.update(s)
 
-    @property
-    def empty(self):
-        return self.collector.peek() == None and not self.timer.pending
 
 class Names(object):
     def __init__(self, base=10000):
@@ -530,88 +267,6 @@ class StartEvent(ApplicationEvent):
         super(StartEvent, self).__init__("start")
         self.container = container
 
-def _min(a, b):
-    if a and b: return min(a, b)
-    elif a: return a
-    else: return b
-
-class SelectLoop(object):
-    """
-    An io loop based on select()
-    """
-    def __init__(self, events):
-        self.events = events
-        self.selectables = []
-        self._abort = False
-
-    def abort(self):
-        self._abort = True
-
-    def add(self, selectable):
-        self.selectables.append(selectable)
-
-    def remove(self, selectable):
-        self.selectables.remove(selectable)
-
-    @property
-    def redundant(self):
-        return self.events.empty and not self.selectables
-
-    @property
-    def aborted(self):
-        return self._abort
-
-    def run(self):
-        while not (self._abort or self.redundant):
-            self.do_work()
-
-    def do_work(self, timeout=None):
-        """@return True if some work was done, False if time-out expired"""
-        tick = self.events.timer.tick()
-
-        if self.events.process():
-            tick = self.events.timer.tick()
-            while self.events.process():
-                if self._abort: return
-                tick = self.events.timer.tick()
-            return True # Did work, let caller check their conditions, don't 
select.
-
-        stable = False
-        while not stable:
-            reading = []
-            writing = []
-            closed = []
-            for s in self.selectables:
-                if s.reading(): reading.append(s)
-                if s.writing(): writing.append(s)
-                if s.closed(): closed.append(s)
-                else: tick = _min(tick, s.tick())
-
-            for s in closed:
-                self.selectables.remove(s)
-                s.removed()
-            stable = len(closed) == 0
-
-        if self.redundant:
-            return False
-
-        if tick:
-            timeout = _min(tick - time.time(), timeout)
-        if timeout and timeout < 0:
-            timeout = 0
-        if reading or writing or timeout:
-            readable, writable, _ = select(reading, writing, [], timeout)
-            for s in self.selectables:
-                s.tick()
-            for s in readable:
-                s.readable()
-            for s in writable:
-                s.writable()
-
-            return bool(readable or writable)
-        else:
-            return False
-
 
 class Transaction(object):
     """
@@ -626,15 +281,6 @@ class Transaction(object):
         self.failed = False
         self._pending = []
         self.settle_before_discharge = settle_before_discharge
-        class InternalTransactionHandler(OutgoingMessageHandler):
-            def __init__(self):
-                super(InternalTransactionHandler, 
self).__init__(auto_settle=True)
-
-            def on_settled(self, event):
-                if hasattr(event.delivery, "transaction"):
-                    event.transaction = event.delivery.transaction
-                    event.delivery.transaction.handle_outcome(event)
-        self.internal_handler = InternalTransactionHandler()
         self.declare()
 
     def commit(self):
@@ -652,7 +298,6 @@ class Transaction(object):
 
     def _send_ctrl(self, descriptor, value):
         delivery = self.txn_ctrl.send(Message(body=Described(descriptor, 
value)))
-        delivery.context=self.internal_handler
         delivery.transaction = self
         return delivery
 
@@ -803,58 +448,83 @@ class SessionPerConnection(object):
         event.connection.close()
         self._default_session = None
 
+class GlobalOverrides(object):
+    """
+    Internal handler that triggers the necessary socket connect for an
+    opened connection.
+    """
+    def __init__(self, base):
+        self.base = base
+
+    def on_unhandled(self, name, event):
+        if not self._override(event):
+            event.dispatch(self.base)
+
+    def _override(self, event):
+        conn = event.connection
+        return conn and hasattr(conn, '_overrides') and 
event.dispatch(conn._overrides)
+
 class Connector(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
     """
-    def __init__(self, loop, ssl_domain=None):
-        self.loop = loop
-        self.ssl_domain = ssl_domain
-
-    def _get_ssl_domain(self, connection, scheme):
-        if hasattr(connection, 'ssl_domain'):
-            return connection.ssl_domain
-        elif scheme == 'amqps':
-            return self.ssl_domain
-        else:
-            return None
+    def __init__(self, connection):
+        self.connection = connection
+        self.address = None
+        self.heartbeat = None
+        self.reconnect = None
+        self.ssl_domain = None
 
     def _connect(self, connection):
-        url = connection.address.next()
-        logging.info("connecting to %s:%i" % (url.host, url.port))
-        heartbeat = None
-        if hasattr(connection, 'heartbeat'):
-            heartbeat = connection.heartbeat
-        s = AmqpSocket(connection, socket.socket(), self.loop.events, 
heartbeat=heartbeat)
-        s.connect(url.host, url.port, username=url.username, 
password=url.password, ssl_domain=self._get_ssl_domain(connection, url.scheme))
-        self.loop.add(s)
-        connection._pin = None #connection is now referenced by AmqpSocket, so 
no need for circular reference
+        url = self.address.next()
+        # IoHandler uses the hostname to determine where to try to connect to
+        connection.hostname = "%s:%i" % (url.host, url.port)
+        logging.info("connecting to %s..." % connection.hostname)
+
+        transport = Transport()
+        transport.bind(connection)
+        if self.heartbeat:
+            transport.idle_timeout = self.heartbeat
+        if url.scheme == 'amqps' and self.ssl_domain:
+            self.ssl = SSL(transport, self.ssl_domain)
+            self.ssl.peer_hostname = url.host
+        if url.username:
+            sasl = transport.sasl()
+            if url.username == 'anonymous':
+                sasl.mechanisms('ANONYMOUS')
+            else:
+                sasl.plain(url.username, url.password)
 
     def on_connection_local_open(self, event):
-        if hasattr(event.connection, "address"):
-            self._connect(event.connection)
+        self._connect(event.connection)
 
     def on_connection_remote_open(self, event):
-        if hasattr(event.connection, "reconnect"):
-            event.connection.reconnect.reset()
-
-    def on_disconnected(self, event):
-        if hasattr(event.connection, "reconnect"):
-            event.connection._pin = event.connection #no longer referenced by 
AmqpSocket, so pin in memory with circular reference
-            delay = event.connection.reconnect.next()
-            if delay == 0:
-                logging.info("Disconnected, reconnecting...")
-                self._connect(event.connection)
+        logging.info("connected to %s" % event.connection.hostname)
+        if self.reconnect:
+            self.reconnect.reset()
+            self.transport = None
+
+    def on_transport_closed(self, event):
+        if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE:
+            if self.reconnect:
+                event.transport.unbind()
+                delay = self.reconnect.next()
+                if delay == 0:
+                    logging.info("Disconnected, reconnecting...")
+                    self._connect(self.connection)
+                else:
+                    logging.info("Disconnected will try to reconnect after %s 
seconds" % delay)
+                    event.reactor.schedule(delay, self)
             else:
-                logging.info("Disconnected will try to reconnect after %s 
seconds" % delay)
-                self.loop.schedule(time.time() + delay, 
connection=event.connection, subject=self)
-        else:
-            logging.info("Disconnected")
+                logging.info("Disconnected")
+                self.connection = None
 
-    def on_timer(self, event):
-        if event.subject == self and event.connection:
-            self._connect(event.connection)
+    def on_timer_task(self, event):
+        self._connect(self.connection)
+
+    def on_connection_remote_close(self, event):
+        self.connection = None
 
 class Backoff(object):
     """
@@ -904,33 +574,33 @@ class SSLConfig(object):
         self.server.set_trusted_ca_db(certificate_db)
 
 
-class Container(object):
-    def __init__(self, *handlers):
-        self.ssl = SSLConfig()
-        h = [Connector(self, self.ssl.client), ScopedHandler()]
-        h.extend(handlers)
-        self.events = Events(*h)
-        self.loop = SelectLoop(self.events)
-        self.trigger = None
-        self.container_id = str(generate_uuid())
+class Container(Reactor):
+    def __init__(self, *handlers, **kwargs):
+        super(Container, self).__init__(*handlers, **kwargs)
+        if "impl" not in kwargs:
+            self.ssl = SSLConfig()
+            self.global_handler = GlobalOverrides(kwargs.get('global_handler', 
self.global_handler))
+            self.trigger = None
+            self.container_id = str(generate_uuid())
+            Wrapper.__setattr__(self, 'subclass', self.__class__)
 
     def connect(self, url=None, urls=None, address=None, handler=None, 
reconnect=None, heartbeat=None, ssl_domain=None):
-        conn = self.events.connection()
-        conn._pin = conn #circular reference until the open event gets handled
-        if handler:
-            conn.context = handler
+        conn = self.connection(handler)
         conn.container = self.container_id or str(generate_uuid())
-        conn.heartbeat = heartbeat
-        if url: conn.address = Urls([url])
-        elif urls: conn.address = Urls(urls)
-        elif address: conn.address = address
+
+        connector = Connector(conn)
+        conn._overrides = connector
+        if url: connector.address = Urls([url])
+        elif urls: connector.address = Urls(urls)
+        elif address: connector.address = address
         else: raise ValueError("One of url, urls or address required")
+        if heartbeat:
+            connector.heartbeat = heartbeat
         if reconnect:
-            conn.reconnect = reconnect
+            connector.reconnect = reconnect
         elif reconnect is None:
-            conn.reconnect = Backoff()
-        if ssl_domain:
-            conn.ssl_domain = ssl_domain
+            connector.reconnect = Backoff()
+        connector.ssl_domain = ssl_domain or self.ssl.client
         conn._session_policy = SessionPerConnection() #todo: make configurable
         conn.open()
         return conn
@@ -966,7 +636,7 @@ class Container(object):
         if target:
             snd.target.address = target
         if handler:
-            snd.context = handler
+            snd.handler = handler
         if tags:
             snd.tag_generator = tags
         _apply_link_options(options, snd)
@@ -987,46 +657,40 @@ class Container(object):
         if target:
             rcv.target.address = target
         if handler:
-            rcv.context = handler
+            rcv.handler = handler
         _apply_link_options(options, rcv)
         rcv.open()
         return rcv
 
     def declare_transaction(self, context, handler=None, 
settle_before_discharge=False):
         if not _get_attr(context, '_txn_ctrl'):
-            context._txn_ctrl = self.create_sender(context, None, 
name='txn-ctrl')
+            class InternalTransactionHandler(OutgoingMessageHandler):
+                def __init__(self):
+                    super(InternalTransactionHandler, 
self).__init__(auto_settle=True)
+
+                def on_settled(self, event):
+                    if hasattr(event.delivery, "transaction"):
+                        event.transaction = event.delivery.transaction
+                        event.delivery.transaction.handle_outcome(event)
+            context._txn_ctrl = self.create_sender(context, None, 
name='txn-ctrl', handler=InternalTransactionHandler())
             context._txn_ctrl.target.type = Terminus.COORDINATOR
             
context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
         return Transaction(context._txn_ctrl, handler, settle_before_discharge)
 
     def listen(self, url, ssl_domain=None):
-        url = Urls([url]).next()
+        url = Url(url)
         ssl_config = ssl_domain
         if not ssl_config and url.scheme == 'amqps':
             ssl_config = self.ssl_domain
-        return AmqpAcceptor(self.events, self, url.host, url.port, 
ssl_domain=ssl_config)
-
-    def schedule(self, deadline, connection=None, session=None, link=None, 
delivery=None, subject=None):
-        self.events.timer.schedule(deadline, ApplicationEvent("timer", 
connection, session, link, delivery, subject))
+        return self.acceptor(url.host, url.port)
 
     def get_event_trigger(self):
         if not self.trigger or self.trigger.closed():
-            self.trigger = EventInjector(self.events.collector)
-            self.add(self.trigger)
+            self.trigger = EventInjector()
+            self.selectable(self.trigger)
         return self.trigger
 
-    def add(self, selectable):
-        self.loop.add(selectable)
-
-    def remove(self, selectable):
-        self.loop.remove(selectable)
-
-    def run(self):
-        self.events.dispatch(StartEvent(self))
-        self.loop.run()
-
-    def stop(self):
-        self.loop.abort()
-
     def do_work(self, timeout=None):
-        return self.loop.do_work(timeout)
+        if timeout:
+            self.timeout = timeout
+        return self.process()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6136f112/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py 
b/proton-c/bindings/python/proton/utils.py
index 6ea5b4b..924f3d9 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -19,8 +19,8 @@
 import collections, Queue, socket, time, threading
 from proton import ConnectionException, Delivery, Endpoint, Handler, 
LinkException, Message
 from proton import ProtonException, Timeout, Url
-from proton.reactors import AmqpSocket, Container, Events, SelectLoop
-from proton.handlers import Acking, MessagingHandler, ScopedHandler, 
IncomingMessageHandler
+from proton.reactors import Container
+from proton.handlers import MessagingHandler, IncomingMessageHandler
 
 def utf8(s):
     if isinstance(s, unicode):
@@ -190,6 +190,7 @@ class BlockingConnection(Handler):
     def __init__(self, url, timeout=None, container=None, ssl_domain=None):
         self.timeout = timeout
         self.container = container or Container()
+        self.container.start()
         self.url = Url(utf8(url)).defaults()
         self.conn = self.container.connect(url=self.url, handler=self, 
ssl_domain=ssl_domain)
         self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
@@ -216,7 +217,7 @@ class BlockingConnection(Handler):
 
     def run(self):
         """ Hand control over to the event loop (e.g. if waiting indefinitely 
for incoming messages) """
-        self.container.run()
+        while self.container.process(): pass
 
     def wait(self, condition, timeout=False, msg=None):
         """Call do_work until condition() is true"""
@@ -224,11 +225,12 @@ class BlockingConnection(Handler):
             timeout = self.timeout
         if timeout is None:
             while not condition():
-                self.container.do_work()
+                self.container.process()
         else:
             deadline = time.time() + timeout
             while not condition():
-                if not self.container.do_work(deadline - time.time()):
+                self.container.process()
+                if deadline < time.time():
                     txt = "Connection %s timed out" % self.url
                     if msg: txt += ": " + msg
                     raise Timeout(txt)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to