NO-JIRA: add a simple broker against which intermediated examples can be run
(cherry picked from commit 8235ba1f1da41e67c284b866777b28118e4691d8) Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0eefdb11 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0eefdb11 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0eefdb11 Branch: refs/heads/0.9.x Commit: 0eefdb11f2e0076ab4116ade5c316476ae09546e Parents: 913cb95 Author: Gordon Sim <[email protected]> Authored: Mon Mar 9 14:24:52 2015 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Apr 27 15:12:49 2015 +0100 ---------------------------------------------------------------------- examples/python/README | 10 +- examples/python/broker.py | 126 +++++++++++++++++++++++ proton-c/bindings/python/proton/handlers.py | 2 +- 3 files changed, 130 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0eefdb11/examples/python/README ---------------------------------------------------------------------- diff --git a/examples/python/README b/examples/python/README index a361fc4..8c60fd8 100644 --- a/examples/python/README +++ b/examples/python/README @@ -1,7 +1,9 @@ Most (though not all) of the current examples require a broker or similar intermediary that supports the AMQP 1.0 protocol, allows anonymous connections and accepts links to and from a node named -'examples'. +'examples'. A very simple broker emulating script - broker.py - is +provided against which the examples can also be run (transactions are +not yet supported in this script). ------------------------------------------------------------------ @@ -85,12 +87,6 @@ tx_send.py A sender that sends messages in atomic batches using local transactions (this example does not persist the messages in anyway). -tx_send_sync.py - -A variant of the former example that waits for all messages in a batch -to be acknowledged before committing. Used only to work around an -ordering issue in preoton that affected qpidd. - tx_recv.py A receiver example that accepts batches of messages using local http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0eefdb11/examples/python/broker.py ---------------------------------------------------------------------- diff --git a/examples/python/broker.py b/examples/python/broker.py new file mode 100755 index 0000000..13eb97c --- /dev/null +++ b/examples/python/broker.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import collections, optparse +from proton import Endpoint, generate_uuid +from proton.handlers import MessagingHandler +from proton.reactor import Container + +class Queue(object): + def __init__(self, dynamic=False): + self.dynamic = dynamic + self.queue = collections.deque() + self.consumers = [] + + def subscribe(self, consumer): + self.consumers.append(consumer) + + def unsubscribe(self, consumer): + if consumer in self.consumers: + self.consumers.remove(consumer) + return len(self.consumers) == 0 and (self.dynamic or self.queue.count == 0) + + def publish(self, message): + self.queue.append(message) + self.dispatch() + + def dispatch(self, consumer=None): + if consumer: + c = [consumer] + else: + c = self.consumers + while self._deliver_to(c): pass + + def _deliver_to(self, consumers): + try: + result = False + for c in consumers: + if c.credit: + c.send(self.queue.popleft()) + result = True + return result + except IndexError: # no more messages + return False + +class Broker(MessagingHandler): + def __init__(self, url): + super(Broker, self).__init__() + self.url = url + self.queues = {} + + def on_start(self, event): + self.acceptor = event.container.listen(self.url) + + def _queue(self, address): + if address not in self.queues: + self.queues[address] = Queue() + return self.queues[address] + + def on_link_opening(self, event): + if event.link.is_sender: + if event.link.remote_source.dynamic: + address = str(generate_uuid()) + event.link.source.address = address + q = Queue(True) + self.queues[address] = q + q.subscribe(event.link) + elif event.link.remote_source.address: + event.link.source.address = event.link.remote_source.address + self._queue(event.link.source.address).subscribe(event.link) + elif event.link.remote_target.address: + event.link.target.address = event.link.remote_target.address + + def _unsubscribe(self, link): + if link.source.address in self.queues and self.queues[link.source.address].unsubscribe(link): + del self.queues[link.source.address] + + def on_link_closing(self, event): + if event.link.is_sender: + self._unsubscribe(event.link) + + def on_connection_closing(self, event): + print "on_connection_closing" + self.remove_stale_consumers(event.connection) + + def on_disconnected(self, event): + print "on_disconnected" + self.remove_stale_consumers(event.connection) + + def remove_stale_consumers(self, connection): + l = connection.link_head(Endpoint.REMOTE_ACTIVE) + while l: + if l.is_sender: + self._unsubscribe(l) + l = l.next(Endpoint.REMOTE_ACTIVE) + + def on_sendable(self, event): + self._queue(event.link.source.address).dispatch(event.link) + + def on_message(self, event): + self._queue(event.link.target.address).publish(event.message) + +parser = optparse.OptionParser(usage="usage: %prog [options]") +parser.add_option("-a", "--address", default="localhost:5672", + help="address router listens on (default %default)") +opts, args = parser.parse_args() + +try: + Container(Broker(opts.address)).run() +except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0eefdb11/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py index 53dda92..5411a1d 100644 --- a/proton-c/bindings/python/proton/handlers.py +++ b/proton-c/bindings/python/proton/handlers.py @@ -370,7 +370,7 @@ class EndpointStateHandler(Handler): self.on_transport_closed(event) def on_transport_closed(self, event): - if self.delegate: + if self.delegate and self.is_local_open(event.connection): dispatch(self.delegate, 'on_disconnected', event) class MessagingHandler(Handler, Acking): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
