Repository: qpid-proton
Updated Branches:
  refs/heads/master 763a0798d -> 67d4d7490


PROTON-1858: Rework wrapped C handlers in pure python
- Rewritten Handshaker and FlowController in python
- CFlowController and CHandshaker are now aliases for
  Flowcontroller and Handshaker


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/67d4d749
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/67d4d749
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/67d4d749

Branch: refs/heads/master
Commit: 67d4d749033c1eb03b2f1bff20518c8050d0ccdb
Parents: 763a079
Author: Andrew Stitcher <astitc...@apache.org>
Authored: Fri May 25 18:18:55 2018 -0400
Committer: Andrew Stitcher <astitc...@apache.org>
Committed: Tue Jun 12 10:43:09 2018 -0400

----------------------------------------------------------------------
 python/proton/handlers.py            | 78 +++++++++++++++++++++++++++----
 tests/python/proton_tests/common.py  |  4 +-
 tests/python/proton_tests/reactor.py |  4 +-
 3 files changed, 72 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/67d4d749/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/handlers.py b/python/proton/handlers.py
index 15e31a0..b30408e 100644
--- a/python/proton/handlers.py
+++ b/python/proton/handlers.py
@@ -415,7 +415,7 @@ class MessagingHandler(Handler, Acking):
     def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, 
peer_close_is_error=False):
         self.handlers = []
         if prefetch:
-            self.handlers.append(CFlowController(prefetch))
+            self.handlers.append(FlowController(prefetch))
         self.handlers.append(EndpointStateHandler(peer_close_is_error, 
weakref.proxy(self)))
         self.handlers.append(IncomingMessageHandler(auto_accept, 
weakref.proxy(self)))
         self.handlers.append(OutgoingMessageHandler(auto_settle, 
weakref.proxy(self)))
@@ -599,21 +599,79 @@ class TransactionalClientHandler(MessagingHandler, 
TransactionHandler):
             super(TransactionalClientHandler, self).accept(delivery)
 
 
-from ._events import WrappedHandler
-from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
-
+class FlowController(Handler):
+    def __init__(self, window=1024):
+        self._window = window
+        self._drained = 0
 
-class CFlowController(WrappedHandler):
+    def on_link_local_open(self, event):
+        self._flow(event.link)
 
-    def __init__(self, window=1024):
-        WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
+    def on_link_remote_open(self, event):
+        self._flow(event.link)
 
+    def on_link_flow(self, event):
+        self._flow(event.link)
 
-class CHandshaker(WrappedHandler):
+    def on_delivery(self, event):
+        self._flow(event.link)
+
+    def _flow(self, link):
+        if link.is_receiver:
+            self._drained += link.drained()
+            if self._drained == 0:
+                delta = self._window - link.credit
+                link.flow(delta)
+
+
+class Handshaker(Handler):
+
+    @staticmethod
+    def on_connection_remote_open(event):
+        conn = event.connection
+        if conn.state & Endpoint.LOCAL_UNINIT:
+            conn.open()
+
+    @staticmethod
+    def on_session_remote_open(event):
+        ssn = event.session
+        if ssn.state() & Endpoint.LOCAL_UNINIT:
+            ssn.open()
+
+    @staticmethod
+    def on_link_remote_open(event):
+        link = event.link
+        if link.state & Endpoint.LOCAL_UNINIT:
+            link.source.copy(link.remote_source)
+            link.target.copy(link.remote_target)
+            link.open()
+
+    @staticmethod
+    def on_connection_remote_close(event):
+        conn = event.connection
+        if not conn.state & Endpoint.LOCAL_CLOSED:
+            conn.close()
+
+    @staticmethod
+    def on_session_remote_close(event):
+        ssn = event.session
+        if not ssn.state & Endpoint.LOCAL_CLOSED:
+            ssn.close()
+
+    @staticmethod
+    def on_link_remote_close(event):
+        link = event.link
+        if not link.state & Endpoint.LOCAL_CLOSED:
+            link.close()
+
+
+# Back compatibility definitions
+CFlowController = FlowController
+CHandshaker = Handshaker
 
-    def __init__(self):
-        WrappedHandler.__init__(self, pn_handshaker)
 
+from ._events import WrappedHandler
+from cproton import pn_iohandler
 
 class IOHandler(WrappedHandler):
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/67d4d749/tests/python/proton_tests/common.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/common.py 
b/tests/python/proton_tests/common.py
index ed65e2f..90ba34c 100644
--- a/tests/python/proton_tests/common.py
+++ b/tests/python/proton_tests/common.py
@@ -34,7 +34,7 @@ from subprocess import Popen,PIPE,STDOUT
 import sys, os, subprocess
 from proton import SASL, SSL
 from proton.reactor import Container
-from proton.handlers import CHandshaker, CFlowController
+from proton.handlers import Handshaker, FlowController
 from string import Template
 
 def free_tcp_ports(count=1):
@@ -195,7 +195,7 @@ class TestServer(object):
       self.host = kwargs["host"]
     if "port" in kwargs:
       self.port = kwargs["port"]
-    self.handlers = [CFlowController(10), CHandshaker()]
+    self.handlers = [FlowController(10), Handshaker()]
     self.thread = Thread(name="server-thread", target=self.run)
     self.thread.daemon = True
     self.running = True

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/67d4d749/tests/python/proton_tests/reactor.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor.py 
b/tests/python/proton_tests/reactor.py
index 8a3a6af..1b1e2fc 100644
--- a/tests/python/proton_tests/reactor.py
+++ b/tests/python/proton_tests/reactor.py
@@ -22,7 +22,7 @@ import time
 import sys
 from .common import Test, SkipTest, TestServer, free_tcp_port, 
ensureCanTestExtendedSASL
 from proton.reactor import Container, Reactor, ApplicationEvent, EventInjector
-from proton.handlers import CHandshaker, MessagingHandler
+from proton.handlers import Handshaker, MessagingHandler
 from proton import Handler, Url
 
 class Barf(Exception):
@@ -56,7 +56,7 @@ class BarfOnFinal:
     def on_reactor_final(self, event):
         raise Barf()
     
-class BarfOnFinalDerived(CHandshaker):
+class BarfOnFinalDerived(Handshaker):
     init = False
     
     def on_reactor_init(self, event):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to