Initial set of engine examples along with supporting additions to the library.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/34e64e32 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/34e64e32 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/34e64e32 Branch: refs/heads/master Commit: 34e64e3248b67971b701d990709920fa690c12b4 Parents: 9a72a30 Author: Gordon Sim <[email protected]> Authored: Fri Dec 5 20:49:54 2014 +0000 Committer: Gordon Sim <[email protected]> Committed: Fri Dec 12 11:04:53 2014 +0000 ---------------------------------------------------------------------- examples/engine/py/README | 155 ++++ examples/engine/py/abstract_server.py | 35 + examples/engine/py/client.py | 59 ++ examples/engine/py/client_http.py | 110 +++ examples/engine/py/common.py | 699 ++++++++++++++++ examples/engine/py/db_common.py | 93 +++ examples/engine/py/db_ctrl.py | 46 ++ examples/engine/py/db_recv.py | 54 ++ examples/engine/py/db_send.py | 85 ++ examples/engine/py/helloworld.py | 45 + examples/engine/py/helloworld_blocking.py | 35 + examples/engine/py/helloworld_direct.py | 47 ++ examples/engine/py/helloworld_direct_tornado.py | 52 ++ examples/engine/py/helloworld_tornado.py | 49 ++ examples/engine/py/proton_server.py | 61 ++ examples/engine/py/proton_tornado.py | 70 ++ examples/engine/py/recurring_timer.py | 43 + examples/engine/py/recurring_timer_tornado.py | 44 + examples/engine/py/selected_recv.py | 40 + examples/engine/py/server.py | 56 ++ examples/engine/py/server_tx.py | 77 ++ examples/engine/py/simple_recv.py | 40 + examples/engine/py/simple_send.py | 53 ++ examples/engine/py/sync_client.py | 88 ++ examples/engine/py/tx_recv.py | 61 ++ examples/engine/py/tx_recv_interactive.py | 83 ++ examples/engine/py/tx_send.py | 75 ++ examples/engine/py/tx_send_sync.py | 76 ++ proton-c/bindings/python/CMakeLists.txt | 20 +- proton-c/bindings/python/proton/__init__.py | 63 +- proton-c/bindings/python/proton/handlers.py | 440 ++++++++++ proton-c/bindings/python/proton/reactors.py | 827 +++++++++++++++++++ proton-c/bindings/python/proton/utils.py | 114 +++ 33 files changed, 3870 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/README ---------------------------------------------------------------------- diff --git a/examples/engine/py/README b/examples/engine/py/README new file mode 100644 index 0000000..a361fc4 --- /dev/null +++ b/examples/engine/py/README @@ -0,0 +1,155 @@ +Most (though not all) of the current examples require a broker or +similar intermediary that supports the AMQP 1.0 protocol, allows +anonymous connections and accepts links to and from a node named +'examples'. + +------------------------------------------------------------------ + +helloworld.py + +Basic example that connects to an intermediary on localhost:5672, +establishes a subscription from the 'examples' node on that +intermediary, then creates a sending link to the same node and sends +one message. On receving the message back via the subcription, the +connection is closed. + +helloworld_blocking.py + +The same as the basic helloworld.py, but using a +synchronous/sequential style wrapper on top of the +asynchronous/reactive API. The purpose of this example is just to show +how different functionality can be easily layered should it be +desired. + +helloworld_direct.py + +A variant of the basic hellpwprld example, that does not use an +intermediary, but listens for incoming connections itself. It +establishes a connection to itself with a link over which a single +message is sent. This demonstrates the ease with which a simple daemon +can be built using the API. + +helloworld_tornado.py +helloworld_direct_tornado.py + +These are variant of the helloworld.py and helloworld_direct.py +examples that use the event loop from the tornado library, rather than +that provided within proton itself and demonstrate how proton can be +used with external loops. + +------------------------------------------------------------------- + +simple_send.py + +An example of sending a fixed number of messages and tracking their +(asynchronous) acknowledgement. Handles disconnection while +maintaining an at-least-once guarantee (there may be duplicates, but +no message in the sequence should be lost). Messages are sent through +the 'examples' node on an intermediary accessible on port 5672 on +localhost. + +simple_recv.py + +Subscribes to the 'examples' node on an intermediary accessible on port 5672 on +localhost. Simply prints out the body of received messages. + +db_send.py + +A more realistic sending example, where the messages come from records +in a simple database table. On being acknowledged the records can be +deleted from the table. The database access is done in a separate +thread, so as not to block the event thread during data +access. Messages are sent through the 'examples' node on an +intermediary accessible on port 5672 on localhost. + +db_recv.py + +A receiving example that records messages received from the 'examples' +node on localhost:5672 in a database table and only acknowledges them +when the insert completes. Database access is again done in a separate +thread from the event loop. + +db_ctrl.py + +A utility for setting up the database tables for the two examples +above. Takes two arguments, the action to perform and the name of the +database on which to perfom it. The database used by db_send.py is +src_db, that by db_recv.py is dst_db. The valid actions are 'init', +which creates the table, 'list' which displays the contents and +'insert' which inserts records from standard-in and is used to +populate src_db, e.g. for i in `seq 1 50`; do echo "Message-$i"; done +| ./db_ctrl.py insert src_db. + +tx_send.py + +A sender that sends messages in atomic batches using local +transactions (this example does not persist the messages in anyway). + +tx_send_sync.py + +A variant of the former example that waits for all messages in a batch +to be acknowledged before committing. Used only to work around an +ordering issue in preoton that affected qpidd. + +tx_recv.py + +A receiver example that accepts batches of messages using local +transactions. + +tx_recv_interactive.py + +A testing utility that allow interactive control of the +transactions. Actions are keyed in to the console, 'fetch' will +request another message, 'abort' will abort the transaction, 'commit' +will commit it. + +The various send/recv examples can be mixed and matched if desired. + +------------------------------------------------------------------- + +client.py + +The client part of a request-response example. Sends requests and +prints out responses. Requires an intermediary that support the AMQP +1.0 dynamic nodes on which the responses are received. The requests +are sent through the 'examples' node. + +server.py + +The server part of a request-response example, that receives requests +via the examples node, converts the body to uppercase and sends the +result back to the indicated reply address. + +sync_client.py + +A variant of the client part, that uses a blocking/synchronous style +instead of the reactive/asynchronous style. + +client_http.py + +A variant of the client part that takes the input to be submitted in +the request over HTTP (point your browser to localhost:8888/client) + +server_tx.py + +A variant of the server part that consumes the request and sends out +the response atomically in a local transaction. + +------------------------------------------------------------------- + +selected_recv.py + +An example that uses a selector filter. + +------------------------------------------------------------------- + +recurring_timer.py + +An example showing a simple timer event. + +recurring_timer_tornado.py + +A variant of the above that uses the tornado eventloop instead. + +------------------------------------------------------------------- + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/abstract_server.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/abstract_server.py b/examples/engine/py/abstract_server.py new file mode 100755 index 0000000..2d0de32 --- /dev/null +++ b/examples/engine/py/abstract_server.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton_server import Server + +class Application(Server): + def __init__(self, host, address): + super(Application, self).__init__(host, address) + + def on_request(self, request, reply_to): + response = request.upper() + self.send(response, reply_to) + print "Request from: %s" % reply_to + +try: + Application("localhost:5672", "examples").run() +except KeyboardInterrupt: pass + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/client.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/client.py b/examples/engine/py/client.py new file mode 100755 index 0000000..d1e2706 --- /dev/null +++ b/examples/engine/py/client.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactors import Container + +class Client(MessagingHandler): + def __init__(self, host, address, requests): + super(Client, self).__init__() + self.host = host + self.address = address + self.requests = requests + + def on_start(self, event): + self.conn = event.container.connect(self.host) + self.sender = event.container.create_sender(self.conn, self.address) + self.receiver = event.container.create_receiver(self.conn, None, dynamic=True) + + def next_request(self): + if self.receiver.remote_source.address: + req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0]) + self.sender.send_msg(req) + + def on_link_opened(self, event): + if event.receiver == self.receiver: + self.next_request() + + def on_message(self, event): + print "%s => %s" % (self.requests.pop(0), event.message.body) + if self.requests: + self.next_request() + else: + self.conn.close() + +REQUESTS= ["Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe."] + +Container(Client("localhost:5672", "examples", REQUESTS)).run() + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/client_http.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py new file mode 100755 index 0000000..5202f8d --- /dev/null +++ b/examples/engine/py/client_http.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton_tornado import TornadoLoop +from tornado.ioloop import IOLoop +import tornado.web + +class Client(MessagingHandler): + def __init__(self, host, address): + super(Client, self).__init__() + self.host = host + self.address = address + self.sent = [] + self.pending = [] + self.reply_address = None + self.sender = None + self.receiver = None + + def on_start(self, event): + conn = event.container.connect(self.host) + self.sender = event.container.create_sender(conn, self.address) + self.receiver = event.container.create_receiver(conn, None, dynamic=True) + + def on_link_opened(self, event): + if event.receiver == self.receiver: + self.reply_address = event.link.remote_source.address + self.do_request() + + def on_credit(self, event): + self.do_request() + + def on_message(self, event): + if self.sent: + request, handler = self.sent.pop(0) + print "%s => %s" % (request, event.message.body) + handler(event.message.body) + self.do_request() + + def do_request(self): + if self.pending and self.reply_address and self.sender.credit: + request, handler = self.pending.pop(0) + self.sent.append((request, handler)) + req = Message(reply_to=self.reply_address, body=request) + self.sender.send_msg(req) + + def request(self, body, handler): + self.pending.append((body, handler)) + self.do_request() + +class ExampleHandler(tornado.web.RequestHandler): + def initialize(self, client): + self.client = client + + def get(self): + self._write_open() + self._write_form() + self._write_close() + + @tornado.web.asynchronous + def post(self): + client.request(self.get_body_argument("message"), lambda x: self.on_response(x)) + + def on_response(self, body): + self.set_header("Content-Type", "text/html") + self._write_open() + self._write_form() + self.write("Response: " + body) + self._write_close() + self.finish() + + def _write_open(self): + self.write('<html><body>') + + def _write_close(self): + self.write('</body></html>') + + def _write_form(self): + self.write('<form action="/client" method="POST">' + 'Request: <input type="text" name="message">' + '<input type="submit" value="Submit">' + '</form>') + + +client = Client("localhost:5672", "examples") +loop = TornadoLoop(client) +app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(client=client))]) +app.listen(8888) +try: + loop.run() +except KeyboardInterrupt: + loop.stop() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/common.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/common.py b/examples/engine/py/common.py new file mode 100644 index 0000000..d4d9a69 --- /dev/null +++ b/examples/engine/py/common.py @@ -0,0 +1,699 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import errno, os, random, select, time, traceback +from proton import * +from socket import * +from threading import Thread +from heapq import heappush, heappop, nsmallest + +class Selectable: + + def __init__(self, transport, socket): + self.transport = transport + self.socket = socket + self.write_done = False + self.read_done = False + + def closed(self): + if self.write_done and self.read_done: + self.socket.close() + return True + else: + return False + + def fileno(self): + return self.socket.fileno() + + def reading(self): + if self.read_done: return False + c = self.transport.capacity() + if c > 0: + return True + elif c < 0: + self.read_done = True + return False + + def writing(self): + if self.write_done: return False + try: + p = self.transport.pending() + if p > 0: + return True + elif p < 0: + self.write_done = True + return False + except TransportException, e: + self.write_done = True + return False + + def readable(self): + c = self.transport.capacity() + if c > 0: + try: + data = self.socket.recv(c) + if data: + self.transport.push(data) + else: + self.transport.close_tail() + except error, e: + print "read error", e + self.transport.close_tail() + self.read_done = True + elif c < 0: + self.read_done = True + + def writable(self): + try: + p = self.transport.pending() + if p > 0: + data = self.transport.peek(p) + n = self.socket.send(data) + self.transport.pop(n) + elif p < 0: + self.write_done = True + except error, e: + print "write error", e + self.transport.close_head() + self.write_done = True + + def tick(self, now): + return self.transport.tick(now) + +class Acceptor: + + def __init__(self, driver, host, port): + self.driver = driver + self.socket = socket() + self.socket.setblocking(0) + self.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + self.socket.bind((host, port)) + self.socket.listen(5) + self.driver.add(self) + + def closed(self): + return False + + def fileno(self): + return self.socket.fileno() + + def reading(self): + return True + + def writing(self): + return False + + def readable(self): + sock, addr = self.socket.accept() + sock.setblocking(0) + print "Incoming Connection:", addr + if sock: + conn = Connection() + conn.collect(self.driver.collector) + transport = Transport() + transport.bind(conn) + sasl = transport.sasl() + sasl.mechanisms("ANONYMOUS") + sasl.server() + sasl.done(SASL.OK) + sel = Selectable(transport, sock) + self.driver.add(sel) + + def tick(self, now): + return None + +class Interrupter: + + def __init__(self): + self.read, self.write = os.pipe() + + def fileno(self): + return self.read + + def readable(self): + os.read(self.read, 1024) + + def interrupt(self): + os.write(self.write, 'x') + +class PQueue: + + def __init__(self): + self.entries = [] + + def add(self, task, priority): + heappush(self.entries, (priority, task)) + + def peek(self): + if self.entries: + return nsmallest(1, self.entries)[0] + else: + return None + + def pop(self): + if self.entries: + return heappop(self.entries) + else: + return None + + def __nonzero__(self): + if self.entries: + return True + else: + return False + +import proton + +TIMER_EVENT = proton.EventType(10000, "on_timer") + +class Timer: + + def __init__(self, collector): + self.collector = collector + self.tasks = PQueue() + + def schedule(self, task, deadline): + self.tasks.add(task, deadline) + + def tick(self, now): + while self.tasks: + deadline, task = self.tasks.peek() + if now > deadline: + self.tasks.pop() + self.collector.put(task, TIMER_EVENT) + else: + return deadline + +def _dispatch(ev, handler): + if ev.clazz == "pn_delivery" and ev.context.released: + return + else: + ev.dispatch(handler) + +def _expand(handlers): + result = [] + for h in handlers: + if hasattr(h, "handlers"): + result.extend(h.handlers) + else: + result.append(h) + return result + +class Driver(Handler): + + def __init__(self, *handlers): + self.collector = Collector() + self.handlers = _expand(handlers) + self.interrupter = Interrupter() + self.timer = Timer(self.collector) + self.selectables = [] + self.now = None + self.deadline = None + self._abort = False + self._exit = False + self._thread = Thread(target=self.run) + self._thread.setDaemon(True) + + def schedule(self, task, timeout): + self.timer.schedule(task, self.now + timeout) + + def abort(self): + self._abort = True + + def exit(self): + self._exit = True + + def wakeup(self): + self.interrupter.interrupt() + + def start(self): + self._thread.start() + + def join(self): + self._thread.join() + + def _init_deadline(self): + self.now = time.time() + self.deadline = None + + def _update_deadline(self, t): + if t is None or t < self.now: return + if self.deadline is None or t < self.deadline: + self.deadline = t + + @property + def _timeout(self): + if self.deadline is None: + return None + else: + return self.deadline - self.now + + def run(self): + self._init_deadline() + for h in self.handlers: + dispatch(h, "on_start", self) + + while True: + self._init_deadline() + + while True: + self.process_events() + if self._abort: return + self._update_deadline(self.timer.tick(self.now)) + count = self.process_events() + if self._abort: return + if not count: + break + + reading = [self.interrupter] + writing = [] + + for s in self.selectables[:]: + if s.reading(): reading.append(s) + if s.writing(): writing.append(s) + self._update_deadline(s.tick(self.now)) + if s.closed(): self.selectables.remove(s) + + if self._exit and not self.selectables: return + + try: + readable, writable, _ = select.select(reading, writing, [], self._timeout) + except select.error, (err, errtext): + if err == errno.EINTR: + continue + else: + raise + + for s in readable: + s.readable() + for s in writable: + s.writable() + + def process_events(self): + count = 0 + + quiesced = False + while True: + ev = self.collector.peek() + if ev: + count += 1 + quiesced = False + _dispatch(ev, self) + for h in self.get_handlers(ev.context): + _dispatch(ev, h) + self.collector.pop() + elif quiesced: + return count + else: + for h in self.handlers: + dispatch(h, "on_quiesced", self) + quiesced = True + + return count + + getters = { + Transport: lambda x: x.connection, + Delivery: lambda x: x.link, + Sender: lambda x: x.session, + Receiver: lambda x: x.session, + Session: lambda x: x.connection, + } + + def get_handlers(self, context): + if hasattr(context, "handlers"): + return context.handlers + elif context.__class__ in self.getters: + parent = self.getters[context.__class__](context) + return self.get_handlers(parent) + else: + return self.handlers + + def on_connection_local_open(self, event): + conn = event.context + if conn.state & Endpoint.REMOTE_UNINIT: + self._connect(conn) + + def _connect(self, conn): + transport = Transport() + transport.idle_timeout = 300 + sasl = transport.sasl() + sasl.mechanisms("ANONYMOUS") + sasl.client() + transport.bind(conn) + sock = socket() + sock.setblocking(0) + hostport = conn.hostname.split(":", 1) + host = hostport[0] + if len(hostport) > 1: + port = int(hostport[1]) + else: + port = 5672 + sock.connect_ex((host, port)) + selectable = Selectable(transport, sock) + self.add(selectable) + + def on_timer(self, event): + event.context() + + def connection(self, *handlers): + conn = Connection() + if handlers: + conn.handlers = _expand(handlers) + conn.collect(self.collector) + return conn + + def acceptor(self, host, port): + return Acceptor(self, host, port) + + def add(self, selectable): + self.selectables.append(selectable) + +class Handshaker(Handler): + + def on_connection_remote_open(self, event): + conn = event.context + if conn.state & Endpoint.LOCAL_UNINIT: + conn.open() + + def on_session_remote_open(self, event): + ssn = event.context + if ssn.state & Endpoint.LOCAL_UNINIT: + ssn.open() + + def on_link_remote_open(self, event): + link = event.context + if link.state & Endpoint.LOCAL_UNINIT: + link.source.copy(link.remote_source) + link.target.copy(link.remote_target) + link.open() + + def on_connection_remote_close(self, event): + conn = event.context + if not (conn.state & Endpoint.LOCAL_CLOSED): + conn.close() + + def on_session_remote_close(self, event): + ssn = event.context + if not (ssn.state & Endpoint.LOCAL_CLOSED): + ssn.close() + + def on_link_remote_close(self, event): + link = event.context + if not (link.state & Endpoint.LOCAL_CLOSED): + link.close() + +class FlowController(Handler): + + def __init__(self, window): + self.window = window + + def top_up(self, link): + delta = self.window - link.credit + link.flow(delta) + + def on_link_local_open(self, event): + link = event.context + if link.is_receiver: + self.top_up(link) + + def on_link_remote_open(self, event): + link = event.context + if link.is_receiver: + self.top_up(link) + + def on_link_flow(self, event): + link = event.context + if link.is_receiver: + self.top_up(link) + + def on_delivery(self, event): + delivery = event.context + if delivery.link.is_receiver: + self.top_up(delivery.link) + +class Row: + + def __init__(self): + self.links = set() + + def add(self, link): + self.links.add(link) + + def discard(self, link): + self.links.discard(link) + + def choose(self): + if self.links: + return random.choice(list(self.links)) + else: + return None + + def __iter__(self): + return iter(self.links) + + def __nonzero__(self): + return bool(self.links) + + +class Router(Handler): + + EMPTY = Row() + + def __init__(self): + self._outgoing = {} + self._incoming = {} + + def incoming(self, address): + return self._incoming.get(address, self.EMPTY) + + def outgoing(self, address): + return self._outgoing.get(address, self.EMPTY) + + def address(self, link): + if link.is_sender: + return link.source.address or link.target.address + else: + return link.target.address + + def table(self, link): + if link.is_sender: + return self._outgoing + else: + return self._incoming + + def add(self, link): + address = self.address(link) + table = self.table(link) + row = table.get(address) + if row is None: + row = Row() + table[address] = row + row.add(link) + + def remove(self, link): + address = self.address(link) + table = self.table(link) + row = table.get(address) + if row is not None: + row.discard(link) + if not row: + del table[address] + + def on_link_local_open(self, event): + self.add(event.context) + + def on_link_local_close(self, event): + self.remove(event.context) + + def on_link_final(self, event): + self.remove(event.context) + +class Pool(Handler): + + def __init__(self, collector, router=None): + self.collector = collector + self._connections = {} + if router: + self.outgoing_resolver = lambda address: router.outgoing(address).choose() + self.incoming_resolver = lambda address: router.incoming(address).choose() + else: + self.outgoing_resolver = lambda address: None + self.incoming_resolver = lambda address: None + + def resolve(self, remote, local, resolver, constructor): + link = resolver(remote) + if link is None: + host = remote[2:].split("/", 1)[0] + conn = self._connections.get(host) + if conn is None: + conn = Connection() + conn.collect(self.collector) + conn.hostname = host + conn.open() + self._connections[host] = conn + + ssn = conn.session() + ssn.open() + link = constructor(ssn, remote, local) + link.open() + return link + + def on_transport_closed(self, event): + transport = event.context + conn = transport.connection + del self._connections[conn.hostname] + + def outgoing(self, target, source=None): + return self.resolve(target, source, self.outgoing_resolver, self.new_outgoing) + + def incoming(self, source, target=None): + return self.resolve(source, target, self.incoming_resolver, self.new_incoming) + + def new_outgoing(self, ssn, remote, local): + snd = ssn.sender("%s-%s" % (local, remote)) + snd.source.address = local + snd.target.address = remote + return snd + + def new_incoming(self, ssn, remote, local): + rcv = ssn.receiver("%s-%s" % (remote, local)) + rcv.source.address = remote + rcv.target.address = local + return rcv + +class MessageDecoder(Handler): + + def __init__(self, delegate): + self.__delegate = delegate + + def on_start(self, drv): + try: + self.__delegate + except AttributeError: + self.__delegate = self + self.__message = Message() + + def on_delivery(self, event): + dlv = event.context + if dlv.link.is_receiver and not dlv.partial: + encoded = dlv.link.recv(dlv.pending) + self.__message.decode(encoded) + try: + dispatch(self.__delegate, "on_message", dlv.link, self.__message) + dlv.update(Delivery.ACCEPTED) + except: + dlv.update(Delivery.REJECTED) + traceback.print_exc() + dlv.settle() + +class Address: + + def __init__(self, st): + self.st = st + + @property + def host(self): + return self.st[2:].split("/", 1)[0] + + @property + def path(self): + parts = self.st[2:].split("/", 1) + if len(parts) == 2: + return parts[1] + else: + return "" + + def __repr__(self): + return "Address(%r)" % self.st + + def __str__(self): + return self.st + +class SendQueue(Handler): + + def __init__(self, address): + self.address = Address(address) + self.messages = [] + self.sent = 0 + + def on_start(self, drv): + self.driver = drv + self.connect() + + def connect(self): + self.conn = self.driver.connection(self) + self.conn.hostname = self.address.host + ssn = self.conn.session() + snd = ssn.sender(str(self.address)) + snd.target.address = str(self.address) + ssn.open() + snd.open() + self.conn.open() + self.link = snd + + def put(self, message): + self.messages.append(message.encode()) + if self.link: + self.pump(self.link) + + def on_link_flow(self, event): + link = event.context + self.pump(link) + + def pump(self, link): + while self.messages and link.credit > 0: + dlv = link.delivery(str(self.sent)) + bytes = self.messages.pop(0) + link.send(bytes) + dlv.settle() + self.sent += 1 + + def on_transport_closed(self, event): + conn = event.context.connection + self.conn = None + self.link = None + self.driver.schedule(self.connect, 1) + +# XXX: terrible name for this +class RecvQueue(Handler): + + def __init__(self, address, delegate): + self.address = Address(address) + self.delegate = delegate + self.decoder = MessageDecoder(self.delegate) + self.handlers = [FlowController(1024), self.decoder, self] + + def on_start(self, drv): + self.driver = drv + self.decoder.on_start(drv) + self.connect() + + def connect(self): + self.conn = self.driver.connection(self) + self.conn.hostname = self.address.host + ssn = self.conn.session() + rcv = ssn.receiver(str(self.address)) + rcv.source.address = str(self.address) + ssn.open() + rcv.open() + self.conn.open() + + def on_transport_closed(self, event): + conn = event.context.connection + self.conn = None + self.driver.schedule(self.connect, 1) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_common.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_common.py b/examples/engine/py/db_common.py new file mode 100644 index 0000000..584c15a --- /dev/null +++ b/examples/engine/py/db_common.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import Queue +import sqlite3 +import threading + +class Db(object): + def __init__(self, db, events): + self.db = db + self.events = events + self.tasks = Queue.Queue() + self.position = None + self.pending_events = [] + self.thread = threading.Thread(target=self._process) + self.thread.daemon=True + self.thread.start() + + def reset(self): + self.tasks.put(lambda conn: self._reset()) + + def load(self, records, event=None): + self.tasks.put(lambda conn: self._load(conn, records, event)) + + def insert(self, id, data, event=None): + self.tasks.put(lambda conn: self._insert(conn, id, data, event)) + + def delete(self, id, event=None): + self.tasks.put(lambda conn: self._delete(conn, id, event)) + + def _reset(self, ignored=None): + self.position = None + + def _load(self, conn, records, event): + if self.position: + cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,)) + else: + cursor = conn.execute("SELECT * FROM records ORDER BY id") + while not records.full(): + row = cursor.fetchone() + if row: + self.position = row['id'] + records.put(dict(row)) + else: + break + if event: + self.events.trigger(event) + + def _insert(self, conn, id, data, event): + if id: + conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data)) + else: + conn.execute("INSERT INTO records(description) VALUES (?)", (data,)) + if event: + self.pending_events.append(event) + + def _delete(self, conn, id, event): + conn.execute("DELETE FROM records WHERE id=?", (id,)) + if event: + self.pending_events.append(event) + + def _process(self): + conn = sqlite3.connect(self.db) + conn.row_factory = sqlite3.Row + with conn: + while True: + f = self.tasks.get(True) + try: + while True: + f(conn) + f = self.tasks.get(False) + except Queue.Empty: pass + conn.commit() + for event in self.pending_events: + self.events.trigger(event) + self.pending_events = [] http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_ctrl.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_ctrl.py b/examples/engine/py/db_ctrl.py new file mode 100755 index 0000000..b28e0eb --- /dev/null +++ b/examples/engine/py/db_ctrl.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sqlite3 +import sys + +if len(sys.argv) < 3: + print "Usage: %s [init|insert|list] db" % sys.argv[0] +else: + conn = sqlite3.connect(sys.argv[2]) + with conn: + if sys.argv[1] == "init": + conn.execute("DROP TABLE IF EXISTS records") + conn.execute("CREATE TABLE records(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)") + conn.commit() + elif sys.argv[1] == "list": + cursor = conn.cursor() + cursor.execute("SELECT * FROM records") + rows = cursor.fetchall() + for r in rows: + print r + elif sys.argv[1] == "insert": + while True: + l = sys.stdin.readline() + if not l: break + conn.execute("INSERT INTO records(description) VALUES (?)", (l.rstrip(),)) + conn.commit() + else: + print "Unrecognised command: %s" % sys.argv[1] http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_recv.py b/examples/engine/py/db_recv.py new file mode 100755 index 0000000..8b4490d --- /dev/null +++ b/examples/engine/py/db_recv.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton.handlers import MessagingHandler +from proton.reactors import ApplicationEvent, Container +from db_common import Db + +class Recv(MessagingHandler): + def __init__(self, url): + super(Recv, self).__init__(auto_accept=False) + self.url = url + self.delay = 0 + # TODO: load last tag from db + self.last_id = None + + def on_start(self, event): + self.db = Db("dst_db", event.container.get_event_trigger()) + event.container.create_receiver(self.url) + + def on_record_inserted(self, event): + self.accept(event.delivery) + + def on_message(self, event): + id = int(event.message.id) + if (not self.last_id) or id > self.last_id: + self.last_id = id + self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery)) + print "inserted message %s" % id + else: + self.accept(event.delivery) + +try: + Container(Recv("localhost:5672/examples")).run() +except KeyboardInterrupt: pass + + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_send.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py new file mode 100755 index 0000000..ce3ce79 --- /dev/null +++ b/examples/engine/py/db_send.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import Queue +import time +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactors import ApplicationEvent, Container +from db_common import Db + +class Send(MessagingHandler): + def __init__(self, url): + super(Send, self).__init__() + self.url = url + self.delay = 0 + self.sent = 0 + self.load_count = 0 + self.records = Queue.Queue(maxsize=50) + + def on_start(self, event): + self.container = event.container + self.db = Db("src_db", self.container.get_event_trigger()) + self.sender = self.container.create_sender(self.url) + + def on_records_loaded(self, event): + if self.records.empty(): + if event.subject == self.load_count: + print "Exhausted available data, waiting to recheck..." + # check for new data after 5 seconds + self.container.schedule(time.time() + 5, link=self.sender, subject="data") + else: + self.send() + + def request_records(self): + if not self.records.full(): + print "loading records..." + self.load_count += 1 + self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.load_count)) + + def on_credit(self, event): + self.send() + + def send(self): + while self.sender.credit and not self.records.empty(): + record = self.records.get(False) + id = record['id'] + self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id)) + self.sent += 1 + print "sent message %s" % id + self.request_records() + + def on_settled(self, event): + id = int(event.delivery.tag) + self.db.delete(id) + print "settled message %s" % id + + def on_disconnected(self, event): + self.db.reset() + + def on_timer(self, event): + if event.subject == "data": + print "Rechecking for data..." + self.request_records() + +try: + Container(Send("localhost:5672/examples")).run() +except KeyboardInterrupt: pass + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py new file mode 100755 index 0000000..92d6083 --- /dev/null +++ b/examples/engine/py/helloworld.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactors import Container + +class HelloWorld(MessagingHandler): + def __init__(self, server, address): + super(HelloWorld, self).__init__() + self.server = server + self.address = address + + def on_start(self, event): + conn = event.container.connect(self.server) + event.container.create_receiver(conn, self.address) + event.container.create_sender(conn, self.address) + + def on_credit(self, event): + event.sender.send_msg(Message(body=u"Hello World!")) + event.sender.close() + + def on_message(self, event): + print event.message.body + event.connection.close() + +Container(HelloWorld("localhost:5672", "examples")).run() + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_blocking.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_blocking.py b/examples/engine/py/helloworld_blocking.py new file mode 100755 index 0000000..9c5e062 --- /dev/null +++ b/examples/engine/py/helloworld_blocking.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.utils import BlockingConnection +from proton.handlers import IncomingMessageHandler + +class HelloWorldReceiver(IncomingMessageHandler): + def on_message(self, event): + print event.message.body + event.connection.close() + +conn = BlockingConnection("localhost:5672") +conn.create_receiver("examples", handler=HelloWorldReceiver()) +sender = conn.create_sender("examples") +sender.send_msg(Message(body=u"Hello World!")); +conn.run() + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_direct.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_direct.py b/examples/engine/py/helloworld_direct.py new file mode 100755 index 0000000..c961fe5 --- /dev/null +++ b/examples/engine/py/helloworld_direct.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactors import Container + +class HelloWorld(MessagingHandler): + def __init__(self, url): + super(HelloWorld, self).__init__() + self.url = url + + def on_start(self, event): + self.acceptor = event.container.listen(self.url) + event.container.create_sender(self.url) + + def on_credit(self, event): + event.sender.send_msg(Message(body=u"Hello World!")) + event.sender.close() + + def on_message(self, event): + print event.message.body + + def on_accepted(self, event): + event.connection.close() + + def on_connection_closed(self, event): + self.acceptor.close() + +Container(HelloWorld("localhost:8888/examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_direct_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py new file mode 100755 index 0000000..8873357 --- /dev/null +++ b/examples/engine/py/helloworld_direct_tornado.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton_tornado import TornadoLoop + +class HelloWorld(MessagingHandler): + def __init__(self, server, address): + super(HelloWorld, self).__init__() + self.server = server + self.address = address + + def on_start(self, event): + self.eventloop = event.container + self.acceptor = event.container.listen(self.server) + conn = event.container.connect(self.server) + event.container.create_sender(conn, self.address) + + def on_credit(self, event): + event.sender.send_msg(Message(body=u"Hello World!")) + event.sender.close() + + def on_message(self, event): + print event.message.body + + def on_accepted(self, event): + event.connection.close() + + def on_connection_closed(self, event): + self.acceptor.close() + self.eventloop.stop() + +TornadoLoop(HelloWorld("localhost:8888", "examples")).run() + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py new file mode 100755 index 0000000..f7d4c26 --- /dev/null +++ b/examples/engine/py/helloworld_tornado.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton_tornado import TornadoLoop + +class HelloWorld(MessagingHandler): + def __init__(self, server, address): + super(HelloWorld, self).__init__() + self.server = server + self.address = address + + def on_start(self, event): + self.eventloop = event.container + conn = event.container.connect(self.server) + event.container.create_receiver(conn, self.address) + event.container.create_sender(conn, self.address) + + def on_credit(self, event): + event.sender.send_msg(Message(body=u"Hello World!")) + event.sender.close() + + def on_message(self, event): + print event.message.body + event.connection.close() + + def on_connection_closed(self, event): + self.eventloop.stop() + +TornadoLoop(HelloWorld("localhost:5672", "examples")).run() + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/proton_server.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/proton_server.py b/examples/engine/py/proton_server.py new file mode 100644 index 0000000..8a5077b --- /dev/null +++ b/examples/engine/py/proton_server.py @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.reactors import Container +from proton.handlers import MessagingHandler + +class Server(MessagingHandler): + def __init__(self, host, address): + super(Server, self).__init__() + self.container = Container(self) + self.conn = self.container.connect(host) + self.receiver = self.container.create_receiver(self.conn, address) + self.senders = {} + self.relay = None + + def on_message(self, event): + self.on_request(event.message.body, event.message.reply_to) + + def on_connection_open(self, event): + if event.connection.remote_offered_capabilities and "ANONYMOUS-RELAY" in event.connection.remote_offered_capabilities: + self.relay = self.container.create_sender(self.conn, None) + + def on_connection_close(self, endpoint, error): + if error: print "Closed due to %s" % error + self.conn.close() + + def run(self): + self.container.run() + + def send(self, response, reply_to): + sender = self.relay + if not sender: + sender = self.senders.get(reply_to) + if not sender: + sender = self.container.create_sender(self.conn, reply_to) + self.senders[reply_to] = sender + msg = Message(body=response) + if self.relay: + msg.address = reply_to + sender.send_msg(msg) + + def on_request(self, request, reply_to): + pass + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/proton_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/proton_tornado.py b/examples/engine/py/proton_tornado.py new file mode 100644 index 0000000..cfe7d6f --- /dev/null +++ b/examples/engine/py/proton_tornado.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton.reactors import ApplicationEvent, Container, StartEvent +import tornado.ioloop + +class TornadoLoop(Container): + def __init__(self, *handlers): + super(TornadoLoop, self).__init__(*handlers) + self.loop = tornado.ioloop.IOLoop.current() + + def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None): + conn = super(TornadoLoop, self).connect(url, urls, address, handler, reconnect) + self.events.process() + return conn + + def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): + self.loop.call_at(deadline, self.events.dispatch, ApplicationEvent("timer", connection, session, link, delivery, subject)) + + def add(self, conn): + self.loop.add_handler(conn, self._connection_ready, tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE) + + def remove(self, conn): + self.loop.remove_handler(conn) + + def run(self): + self.events.dispatch(StartEvent(self)) + self.loop.start() + + def stop(self): + self.loop.stop() + + def _get_event_flags(self, conn): + flags = 0 + if conn.reading(): + flags |= tornado.ioloop.IOLoop.READ + # FIXME: need way to update flags to avoid busy loop + #if conn.writing(): + # flags |= tornado.ioloop.IOLoop.WRITE + flags |= tornado.ioloop.IOLoop.WRITE + return flags + + def _connection_ready(self, conn, events): + if events & tornado.ioloop.IOLoop.READ: + conn.readable() + if events & tornado.ioloop.IOLoop.WRITE: + conn.writable() + if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed(): + self.loop.remove_handler(conn) + conn.close() + conn.removed() + self.events.process() + self.loop.update_handler(conn, self._get_event_flags(conn)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/recurring_timer.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/recurring_timer.py b/examples/engine/py/recurring_timer.py new file mode 100755 index 0000000..de530d3 --- /dev/null +++ b/examples/engine/py/recurring_timer.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time +from proton.reactors import Container, Handler + +class Recurring(Handler): + def __init__(self, period): + self.period = period + + def on_start(self, event): + self.container = event.container + self.container.schedule(time.time() + self.period, subject=self) + + def on_timer(self, event): + print "Tick..." + self.container.schedule(time.time() + self.period, subject=self) + +try: + container = Container(Recurring(1.0)) + container.run() +except KeyboardInterrupt: + container.stop() + print + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/recurring_timer_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/recurring_timer_tornado.py b/examples/engine/py/recurring_timer_tornado.py new file mode 100755 index 0000000..aeeb20c --- /dev/null +++ b/examples/engine/py/recurring_timer_tornado.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time +from proton.reactors import Handler +from proton_tornado import TornadoLoop + +class Recurring(Handler): + def __init__(self, period): + self.period = period + + def on_start(self, event): + self.container = event.container + self.container.schedule(time.time() + self.period, subject=self) + + def on_timer(self, event): + print "Tick..." + self.container.schedule(time.time() + self.period, subject=self) + +try: + container = TornadoLoop(Recurring(1.0)) + container.run() +except KeyboardInterrupt: + container.stop() + print + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/selected_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/selected_recv.py b/examples/engine/py/selected_recv.py new file mode 100755 index 0000000..d0df3b5 --- /dev/null +++ b/examples/engine/py/selected_recv.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton.reactors import Container, Selector +from proton.handlers import MessagingHandler + +class Recv(MessagingHandler): + def __init__(self): + super(Recv, self).__init__() + + def on_start(self, event): + conn = event.container.connect("localhost:5672") + event.container.create_receiver(conn, "examples", options=Selector(u"colour = 'green'")) + + def on_message(self, event): + print event.message.body + +try: + Container(Recv()).run() +except KeyboardInterrupt: pass + + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/server.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/server.py b/examples/engine/py/server.py new file mode 100755 index 0000000..3e6aad4 --- /dev/null +++ b/examples/engine/py/server.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactors import Container + +class Server(MessagingHandler): + def __init__(self, host, address): + super(Server, self).__init__() + self.host = host + self.address = address + + def on_start(self, event): + self.container = event.container + self.conn = event.container.connect(self.host) + self.receiver = event.container.create_receiver(self.conn, self.address) + self.senders = {} + self.relay = None + + def on_connection_opened(self, event): + if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: + self.relay = self.container.create_sender(self.conn, None) + + def on_message(self, event): + sender = self.relay + if not sender: + sender = self.senders.get(event.message.reply_to) + if not sender: + sender = self.container.create_sender(self.conn, event.message.reply_to) + self.senders[event.message.reply_to] = sender + sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper())) + +try: + Container(Server("localhost:5672", "examples")).run() +except KeyboardInterrupt: pass + + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/server_tx.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/server_tx.py b/examples/engine/py/server_tx.py new file mode 100755 index 0000000..0305a3f --- /dev/null +++ b/examples/engine/py/server_tx.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.reactors import Container +from proton.handlers import MessagingHandler, TransactionHandler + +class TxRequest(TransactionHandler): + def __init__(self, response, sender, request_delivery): + super(TxRequest, self).__init__() + self.response = response + self.sender = sender + self.request_delivery = request_delivery + + def on_transaction_declared(self, event): + self.sender.send_msg(self.response, transaction=event.transaction) + self.accept(self.request_delivery, transaction=event.transaction) + event.transaction.commit() + + def on_transaction_committed(self, event): + print "Request processed successfully" + + def on_transaction_aborted(self, event): + print "Request processing aborted" + + +class TxServer(MessagingHandler): + def __init__(self, host, address): + super(TxServer, self).__init__(auto_accept=False) + self.host = host + self.address = address + + def on_start(self, event): + self.container = event.container + self.conn = event.container.connect(self.host, reconnect=False) + self.receiver = event.container.create_receiver(self.conn, self.address) + self.senders = {} + self.relay = None + + def on_message(self, event): + sender = self.relay + if not sender: + sender = self.senders.get(event.message.reply_to) + if not sender: + sender = self.container.create_sender(self.conn, event.message.reply_to) + self.senders[event.message.reply_to] = sender + + response = Message(address=event.message.reply_to, body=event.message.body.upper()) + self.container.declare_transaction(self.conn, handler=TxRequest(response, sender, event.delivery)) + + def on_connection_open(self, event): + if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: + self.relay = self.container.create_sender(self.conn, None) + +try: + Container(TxServer("localhost:5672", "examples")).run() +except KeyboardInterrupt: pass + + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/simple_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/simple_recv.py b/examples/engine/py/simple_recv.py new file mode 100755 index 0000000..6825c86 --- /dev/null +++ b/examples/engine/py/simple_recv.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton.handlers import MessagingHandler +from proton.reactors import Container + +class Recv(MessagingHandler): + def __init__(self, url): + super(Recv, self).__init__() + self.url = url + + def on_start(self, event): + event.container.create_receiver(self.url) + + def on_message(self, event): + print event.message.body + +try: + Container(Recv("localhost:5672/examples")).run() +except KeyboardInterrupt: pass + + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/simple_send.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py new file mode 100755 index 0000000..21530ef --- /dev/null +++ b/examples/engine/py/simple_send.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactors import Container + +class Send(MessagingHandler): + def __init__(self, url, messages): + super(Send, self).__init__() + self.url = url + self.sent = 0 + self.confirmed = 0 + self.total = messages + + def on_start(self, event): + event.container.create_sender(self.url) + + def on_credit(self, event): + while event.sender.credit and self.sent < self.total: + msg = Message(body={'sequence':(self.sent+1)}) + event.sender.send_msg(msg) + self.sent += 1 + + def on_accepted(self, event): + self.confirmed += 1 + if self.confirmed == self.total: + print "all messages confirmed" + event.connection.close() + + def on_disconnected(self, event): + self.sent = self.confirmed + +try: + Container(Send("localhost:5672/examples", 10000)).run() +except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/sync_client.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/sync_client.py b/examples/engine/py/sync_client.py new file mode 100755 index 0000000..362385a --- /dev/null +++ b/examples/engine/py/sync_client.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Demonstrates the client side of the synchronous request-response pattern +(also known as RPC or Remote Procecure Call) using proton. + +""" + +from proton import Message, Url, ConnectionException, Timeout +from proton.utils import BlockingConnection +from proton.handlers import IncomingMessageHandler +import sys + +class SyncRequestClient(IncomingMessageHandler): + """ + Implementation of the synchronous request-responce (aka RPC) pattern. + Create an instance and call invoke() to send a request and wait for a response. + """ + + def __init__(self, url, timeout=None): + """ + @param url: a proton.Url or a URL string of the form 'host:port/path' + host:port is used to connect, path is used to identify the remote messaging endpoint. + """ + super(SyncRequestClient, self).__init__() + self.connection = BlockingConnection(Url(url).defaults(), timeout=timeout) + self.sender = self.connection.create_sender(url.path) + # dynamic=true generates a unique address dynamically for this receiver. + # credit=1 because we want to receive 1 response message initially. + self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) + self.response = None + + def invoke(self, request): + """Send a request, wait for and return the response""" + request.reply_to = self.reply_to + self.sender.send_msg(request) + self.connection.wait(lambda: self.response, msg="Waiting for response") + response = self.response + self.response = None # Ready for next response. + self.receiver.flow(1) # Set up credit for the next response. + return response + + @property + def reply_to(self): + """Return the dynamic address of our receiver.""" + return self.receiver.remote_source.address + + def on_message(self, event): + """Called when we receive a message for our receiver.""" + self.response = event.message # Store the response + + def close(self): + self.connection.close() + + +if __name__ == '__main__': + url = Url("0.0.0.0/examples") + if len(sys.argv) > 1: url = Url(sys.argv[1]) + + invoker = SyncRequestClient(url, timeout=2) + try: + REQUESTS= ["Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe."] + for request in REQUESTS: + response = invoker.invoke(Message(body=request)) + print "%s => %s" % (request, response.body) + finally: + invoker.close() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/tx_recv.py b/examples/engine/py/tx_recv.py new file mode 100755 index 0000000..fc4bb8a --- /dev/null +++ b/examples/engine/py/tx_recv.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton.reactors import Container +from proton.handlers import TransactionalClientHandler + +class TxRecv(TransactionalClientHandler): + def __init__(self, batch_size): + super(TxRecv, self).__init__(prefetch=0) + self.current_batch = 0 + self.batch_size = batch_size + + def on_start(self, event): + self.container = event.container + self.conn = self.container.connect("localhost:5672") + self.receiver = self.container.create_receiver(self.conn, "examples") + self.container.declare_transaction(self.conn, handler=self) + self.transaction = None + + def on_message(self, event): + print event.message.body + self.accept(event.delivery, self.transaction) + self.current_batch += 1 + if self.current_batch == self.batch_size: + self.transaction.commit() + self.transaction = None + + def on_transaction_declared(self, event): + self.receiver.flow(self.batch_size) + self.transaction = event.transaction + + def on_transaction_committed(self, event): + self.current_batch = 0 + self.container.declare_transaction(self.conn, handler=self) + + def on_disconnected(self, event): + self.current_batch = 0 + +try: + Container(TxRecv(10)).run() +except KeyboardInterrupt: pass + + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_recv_interactive.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/tx_recv_interactive.py b/examples/engine/py/tx_recv_interactive.py new file mode 100755 index 0000000..6eb320e --- /dev/null +++ b/examples/engine/py/tx_recv_interactive.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import threading +from proton.reactors import ApplicationEvent, Container +from proton.handlers import TransactionalClientHandler + +class TxRecv(TransactionalClientHandler): + def __init__(self): + super(TxRecv, self).__init__(prefetch=0) + + def on_start(self, event): + self.container = event.container + self.conn = self.container.connect("localhost:5672") + self.receiver = self.container.create_receiver(self.conn, "examples") + self.container.declare_transaction(self.conn, handler=self, settle_before_discharge=True) + self.transaction = None + + def on_message(self, event): + print event.message.body + self.transaction.accept(event.delivery) + + def on_transaction_declared(self, event): + self.transaction = event.transaction + print "transaction declared" + + def on_transaction_committed(self, event): + print "transaction committed" + self.container.declare_transaction(self.conn, handler=self) + + def on_transaction_aborted(self, event): + print "transaction aborted" + self.container.declare_transaction(self.conn, handler=self) + + def on_commit(self, event): + self.transaction.commit() + + def on_abort(self, event): + self.transaction.abort() + + def on_fetch(self, event): + self.receiver.flow(1) + + def on_quit(self, event): + c = self.receiver.connection + self.receiver.close() + c.close() + +try: + reactor = Container(TxRecv()) + events = reactor.get_event_trigger() + thread = threading.Thread(target=reactor.run) + thread.daemon=True + thread.start() + + print "Enter 'fetch', 'commit' or 'abort'" + while True: + line = sys.stdin.readline() + if line: + events.trigger(ApplicationEvent(line.strip())) + else: + break +except KeyboardInterrupt: pass + + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
