Repository: qpid-proton Updated Branches: refs/heads/examples 2f4d1ba29 -> d4b154cbd
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/recurring_timer.py ---------------------------------------------------------------------- diff --git a/tutorial/recurring_timer.py b/tutorial/recurring_timer.py deleted file mode 100755 index c641ec6..0000000 --- a/tutorial/recurring_timer.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time -from proton.reactors import EventLoop, Handler - -class Recurring(Handler): - def __init__(self, period): - self.eventloop = EventLoop(self) - self.period = period - self.eventloop.schedule(time.time() + self.period, subject=self) - - def on_timer(self, event): - print "Tick..." - self.eventloop.schedule(time.time() + self.period, subject=self) - - def run(self): - self.eventloop.run() - - def stop(self): - self.eventloop.stop() - -try: - app = Recurring(1.0) - app.run() -except KeyboardInterrupt: - app.stop() - print - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/recurring_timer_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/recurring_timer_tornado.py b/tutorial/recurring_timer_tornado.py deleted file mode 100755 index f4ca260..0000000 --- a/tutorial/recurring_timer_tornado.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time -from proton import Handler -from proton_tornado import TornadoLoop - -class Recurring(Handler): - def __init__(self, period): - self.eventloop = TornadoLoop(self) - self.period = period - self.eventloop.schedule(time.time() + self.period, subject=self) - - def on_timer(self, event): - print "Tick..." - self.eventloop.schedule(time.time() + self.period, subject=self) - - def run(self): - self.eventloop.run() - - def stop(self): - self.eventloop.stop() - -try: - app = Recurring(1.0) - app.run() -except KeyboardInterrupt: - app.stop() - print - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/selected_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/selected_recv.py b/tutorial/selected_recv.py deleted file mode 100755 index 8425f3d..0000000 --- a/tutorial/selected_recv.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton.reactors import EventLoop, Selector -from proton.handlers import MessagingHandler - -class Recv(MessagingHandler): - def __init__(self): - super(Recv, self).__init__() - - def on_start(self, event): - conn = event.reactor.connect("localhost:5672") - conn.create_receiver("examples", options=Selector(u"colour = 'green'")) - - def on_message(self, event): - print event.message.body - -try: - EventLoop(Recv()).run() -except KeyboardInterrupt: pass - - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/server.py ---------------------------------------------------------------------- diff --git a/tutorial/server.py b/tutorial/server.py deleted file mode 100755 index 6ab5671..0000000 --- a/tutorial/server.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton.handlers import MessagingHandler -from proton.reactors import EventLoop - -class Server(MessagingHandler): - def __init__(self, host, address): - super(Server, self).__init__() - self.host = host - self.address = address - - def on_start(self, event): - self.conn = event.reactor.connect(self.host) - self.receiver = self.conn.create_receiver(self.address) - self.senders = {} - self.relay = None - - def on_connection_opened(self, event): - if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: - self.relay = self.conn.create_sender(None) - - def on_message(self, event): - sender = self.relay - if not sender: - sender = self.senders.get(event.message.reply_to) - if not sender: - sender = self.conn.create_sender(event.message.reply_to) - self.senders[event.message.reply_to] = sender - sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper())) - -try: - EventLoop(Server("localhost:5672", "examples")).run() -except KeyboardInterrupt: pass - - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/server_tx.py ---------------------------------------------------------------------- diff --git a/tutorial/server_tx.py b/tutorial/server_tx.py deleted file mode 100755 index cda2d0b..0000000 --- a/tutorial/server_tx.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton.reactors import EventLoop -from proton.handlers import MessagingHandler, TransactionHandler - -class TxRequest(TransactionHandler): - def __init__(self, response, sender, request_delivery, context): - super(TxRequest, self).__init__() - self.response = response - self.sender = sender - self.request_delivery = request_delivery - self.context = context - - def on_transaction_declared(self, event): - self.sender.send_msg(self.response, transaction=event.transaction) - self.accept(self.request_delivery, transaction=event.transaction) - event.transaction.commit() - - def on_transaction_committed(self, event): - print "Request processed successfully" - - def on_transaction_aborted(self, event): - print "Request processing aborted" - - -class TxServer(MessagingHandler): - def __init__(self, host, address): - super(TxServer, self).__init__(auto_accept=False) - self.host = host - self.address = address - - def on_start(self, event): - self.context = event.reactor.connect(self.host, reconnect=False) - self.receiver = self.context.create_receiver(self.address) - self.senders = {} - self.relay = None - - def on_message(self, event): - sender = self.relay - if not sender: - sender = self.senders.get(event.message.reply_to) - if not sender: - sender = self.context.create_sender(event.message.reply_to) - self.senders[event.message.reply_to] = sender - - response = Message(address=event.message.reply_to, body=event.message.body.upper()) - self.context.declare_transaction(handler=TxRequest(response, sender, event.delivery, self.context)) - - def on_connection_open(self, event): - if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: - self.relay = self.context.create_sender(None) - -try: - EventLoop(TxServer("localhost:5672", "examples")).run() -except KeyboardInterrupt: pass - - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/simple_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/simple_recv.py b/tutorial/simple_recv.py deleted file mode 100755 index ea80aa6..0000000 --- a/tutorial/simple_recv.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton.handlers import MessagingHandler -from proton.reactors import EventLoop - -class Recv(MessagingHandler): - def __init__(self, host, address): - super(Recv, self).__init__() - self.host = host - self.address = address - - def on_start(self, event): - conn = event.reactor.connect(self.host) - conn.create_receiver(self.address) - - def on_message(self, event): - print event.message.body - -try: - EventLoop(Recv("localhost:5672", "examples")).run() -except KeyboardInterrupt: pass - - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/simple_send.py ---------------------------------------------------------------------- diff --git a/tutorial/simple_send.py b/tutorial/simple_send.py deleted file mode 100755 index bbd30ac..0000000 --- a/tutorial/simple_send.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton.handlers import MessagingHandler -from proton.reactors import EventLoop - -class Send(MessagingHandler): - def __init__(self, host, address, messages): - super(Send, self).__init__() - self.host = host - self.address = address - self.sent = 0 - self.confirmed = 0 - self.total = messages - - def on_start(self, event): - conn = event.reactor.connect(self.host) - conn.create_sender(self.address) - - def on_credit(self, event): - while event.sender.credit and self.sent < self.total: - msg = Message(body={'sequence':(self.sent+1)}) - event.sender.send_msg(msg) - self.sent += 1 - - def on_accepted(self, event): - self.confirmed += 1 - if self.confirmed == self.total: - print "all messages confirmed" - event.connection.close() - - def on_disconnected(self, event): - self.sent = self.confirmed - -try: - EventLoop(Send("localhost:5672", "examples", 10000)).run() -except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/sync_client.py ---------------------------------------------------------------------- diff --git a/tutorial/sync_client.py b/tutorial/sync_client.py deleted file mode 100755 index 362385a..0000000 --- a/tutorial/sync_client.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Demonstrates the client side of the synchronous request-response pattern -(also known as RPC or Remote Procecure Call) using proton. - -""" - -from proton import Message, Url, ConnectionException, Timeout -from proton.utils import BlockingConnection -from proton.handlers import IncomingMessageHandler -import sys - -class SyncRequestClient(IncomingMessageHandler): - """ - Implementation of the synchronous request-responce (aka RPC) pattern. - Create an instance and call invoke() to send a request and wait for a response. - """ - - def __init__(self, url, timeout=None): - """ - @param url: a proton.Url or a URL string of the form 'host:port/path' - host:port is used to connect, path is used to identify the remote messaging endpoint. - """ - super(SyncRequestClient, self).__init__() - self.connection = BlockingConnection(Url(url).defaults(), timeout=timeout) - self.sender = self.connection.create_sender(url.path) - # dynamic=true generates a unique address dynamically for this receiver. - # credit=1 because we want to receive 1 response message initially. - self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) - self.response = None - - def invoke(self, request): - """Send a request, wait for and return the response""" - request.reply_to = self.reply_to - self.sender.send_msg(request) - self.connection.wait(lambda: self.response, msg="Waiting for response") - response = self.response - self.response = None # Ready for next response. - self.receiver.flow(1) # Set up credit for the next response. - return response - - @property - def reply_to(self): - """Return the dynamic address of our receiver.""" - return self.receiver.remote_source.address - - def on_message(self, event): - """Called when we receive a message for our receiver.""" - self.response = event.message # Store the response - - def close(self): - self.connection.close() - - -if __name__ == '__main__': - url = Url("0.0.0.0/examples") - if len(sys.argv) > 1: url = Url(sys.argv[1]) - - invoker = SyncRequestClient(url, timeout=2) - try: - REQUESTS= ["Twas brillig, and the slithy toves", - "Did gire and gymble in the wabe.", - "All mimsy were the borogroves,", - "And the mome raths outgrabe."] - for request in REQUESTS: - response = invoker.invoke(Message(body=request)) - print "%s => %s" % (request, response.body) - finally: - invoker.close() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/tutorial.rst ---------------------------------------------------------------------- diff --git a/tutorial/tutorial.rst b/tutorial/tutorial.rst deleted file mode 100644 index 89eb563..0000000 --- a/tutorial/tutorial.rst +++ /dev/null @@ -1,152 +0,0 @@ -============ -Hello World! -============ - -Tradition dictates that we start with hello world! However rather than -simply striving for the shortest program possible, we'll aim for a -more illustrative example while still restricting the functionality to -sending and receiving a single message. - -.. literalinclude:: helloworld.py - :lines: 21- - :linenos: - -This example uses proton in an event-driven or reactive manner. The -flow of control is an 'event loop', where the events may be triggered -by data arriving on a socket among other things and are then passed to -relevant 'handlers'. Applications are then structured as a set of -defined handlers for events of interest; to be notified of a -particular event, you define a class with an appropriately name method -on it, inform the event loop of that method which then calls it -whenever the event occurs. - -The class we define in this example, ``HelloWorld``, has methods to -handle three types of events. - -The first, ``on_connection_opened()``, is called when the connection -is opened, and when that occurs we create a receiver over which to -receive our message and a sender over which to send it. - -The second method, ``on_credit()``, is called when our sender has been -issued by the peer with 'credit', allowing it to send messages. A -credit based flow control mechanism like this ensures we only send -messages when the recipient is ready and able to receive them. This is -particularly important when the volume of messages might be large. In -our case we are just going to send one message. - -The third and final method, ``on_message()``, is called when a message -arrives. Within that method we simply print the body of the message -and then close the connection. - -This particular example assumes a broker (or similar service), which -accepts connections and routes published messages to intended -recipients. The constructor for the ``HelloWorld`` class takes the -details of the broker to connect to, and the address through which the -message is sent and received (for a broker this corresponds to a queue -or topic name). - -After an instance of ``HelloWorld`` is constructed, the event loop is -entered by the call to the ``run()`` method on the last line. This -call will return only when the loop determines there are no more -events possible (at which point our example program will then exit). - -==================== -Hello World, Direct! -==================== - -Though often used in conjunction with a broker, AMQP does not -*require* this. It also allows senders and receivers can communicate -directly if desired. - -Let's modify our example to demonstrate this. - -.. literalinclude:: helloworld_direct.py - :lines: 21- - :emphasize-lines: 17,33,38 - :linenos: - -The first difference, on line 17, is that rather than creating a -receiver on the same connection as our sender, we listen for incoming -connections by invoking the ``listen() method on the ``EventLoop`` -instance. - -Another difference is that the ``EventLoop`` instance we use is not -the default instance as was used in the original example, but one we -construct ourselves on line 38, passing in some event handlers. The -first of these is ``HelloWorldReceiver``, as used in the original -example. We pass it to the event loop, because we aren't going to -directly create the receiver here ourselves. Rather we will accept an -incoming connection on which the message will be received. This -handler would then be notified of any incoming message event on any of -the connections the event loop controls. As well as our own handler, we -specify a couple of useful handlers from the ``proton_events`` -toolkit. The ``Handshaker`` handler will ensure our server follows the -basic handshaking rules laid down by the protocol. The -``FlowController`` will issue credit for incoming messages. We won't -worry about them in more detail than that for now. - -The last difference is that we close the ``acceptor`` returned from -the ``listen()`` call as part of the handling of the connection close -event (line 33). - -So now we have our example working without a broker involved! - -========== -The Basics -========== - -TODO: These examples show reliable (at-least-once) send and receive -with reconnect ability. Need to write some explanation. Could also do -with some further cleanup. - - -.. literalinclude:: simple_recv.py - :lines: 21- - :linenos: - -.. literalinclude:: simple_send.py - :lines: 21- - :linenos: - -================ -Request/Response -================ - -A common pattern is to send a request message and expect a response -message in return. AMQP has special support for this pattern. Let's -have a look at a simple example. We'll start with the 'server', -i.e. the program that will process the request and send the -response. Note that we are still using a broker in this example. - -Our server will provide a very simple service: it will respond with -the body of the request converted to uppercase. - -.. literalinclude:: server.py - :lines: 21- - :linenos: - -The code here is not too different from the simple receiver example. When -we receive a request however, we look at the reply-to address and -create a sender for that over which to send the response. We'll cache -the senders incase we get further requests wit the same reply-to. - -Now let's create a simple client to test this service out. - -.. literalinclude:: client.py - :lines: 21- - :linenos: - -As well as sending requests, we need to be able to get back the -responses. We create a receiver for that (see line 8), but we don't -specify an address, we set the dynamic option which tells the broker -we are connected to to create a temporary address over which we can -receive our responses. - -We need to use the address allocated by the broker as the reply_to -address of our requests. To be notified when the broker has sent us -back the address to use, we add an ``on_link_remote_open()`` method to -our receiver's handler, and use that as the trigger to send our first -request. - - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/tx_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_recv.py b/tutorial/tx_recv.py deleted file mode 100755 index a28a3df..0000000 --- a/tutorial/tx_recv.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton.reactors import EventLoop -from proton.handlers import TransactionalClientHandler - -class TxRecv(TransactionalClientHandler): - def __init__(self, batch_size): - super(TxRecv, self).__init__(prefetch=0) - self.current_batch = 0 - self.batch_size = batch_size - self.event_loop = EventLoop(self) - self.conn = self.event_loop.connect("localhost:5672") - self.receiver = self.conn.create_receiver("examples") - self.conn.declare_transaction(handler=self) - self.transaction = None - - def on_message(self, event): - print event.message.body - self.accept(event.delivery, self.transaction) - self.current_batch += 1 - if self.current_batch == self.batch_size: - self.transaction.commit() - self.transaction = None - - def on_transaction_declared(self, event): - self.receiver.flow(self.batch_size) - self.transaction = event.transaction - - def on_transaction_committed(self, event): - self.current_batch = 0 - self.conn.declare_transaction(handler=self) - - def on_disconnected(self, event): - self.current_batch = 0 - - def run(self): - self.event_loop.run() - -try: - TxRecv(10).run() -except KeyboardInterrupt: pass - - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/tx_recv_interactive.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_recv_interactive.py b/tutorial/tx_recv_interactive.py deleted file mode 100755 index a822992..0000000 --- a/tutorial/tx_recv_interactive.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -import threading -from proton.reactors import ApplicationEvent, EventLoop -from proton.handlers import TransactionalClientHandler - -class TxRecv(TransactionalClientHandler): - def __init__(self): - super(TxRecv, self).__init__(prefetch=0) - - def on_start(self, event): - self.context = event.reactor.connect("localhost:5672") - self.receiver = self.context.create_receiver("examples") - #self.context.declare_transaction(handler=self, settle_before_discharge=False) - self.context.declare_transaction(handler=self, settle_before_discharge=True) - self.transaction = None - - def on_message(self, event): - print event.message.body - self.transaction.accept(event.delivery) - - def on_transaction_declared(self, event): - self.transaction = event.transaction - print "transaction declared" - - def on_transaction_committed(self, event): - print "transaction committed" - self.context.declare_transaction(handler=self) - - def on_transaction_aborted(self, event): - print "transaction aborted" - self.context.declare_transaction(handler=self) - - def on_commit(self, event): - self.transaction.commit() - - def on_abort(self, event): - self.transaction.abort() - - def on_fetch(self, event): - self.receiver.flow(1) - - def on_quit(self, event): - c = self.receiver.connection - self.receiver.close() - c.close() - -try: - reactor = EventLoop(TxRecv()) - events = reactor.get_event_trigger() - thread = threading.Thread(target=reactor.run) - thread.daemon=True - thread.start() - - print "Enter 'fetch', 'commit' or 'abort'" - while True: - line = sys.stdin.readline() - if line: - events.trigger(ApplicationEvent(line.strip())) - else: - break -except KeyboardInterrupt: pass - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/tx_send.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_send.py b/tutorial/tx_send.py deleted file mode 100755 index b2f12b2..0000000 --- a/tutorial/tx_send.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton.reactors import EventLoop -from proton.handlers import TransactionalClientHandler - -class TxSend(TransactionalClientHandler): - def __init__(self, messages, batch_size): - super(TxSend, self).__init__() - self.current_batch = 0 - self.committed = 0 - self.confirmed = 0 - self.total = messages - self.batch_size = batch_size - self.eventloop = EventLoop() - self.conn = self.eventloop.connect("localhost:5672", handler=self) - self.sender = self.conn.create_sender("examples") - self.conn.declare_transaction(handler=self) - self.transaction = None - - def on_transaction_declared(self, event): - self.transaction = event.transaction - self.send() - - def on_credit(self, event): - self.send() - - def send(self): - while self.transaction and self.sender.credit and self.committed < self.total: - msg = Message(body={'sequence':(self.committed+self.current_batch+1)}) - self.sender.send_msg(msg, transaction=self.transaction) - self.current_batch += 1 - if self.current_batch == self.batch_size: - self.transaction.commit() - self.transaction = None - - def on_accepted(self, event): - if event.sender == self.sender: - self.confirmed += 1 - - def on_transaction_committed(self, event): - self.committed += self.current_batch - if self.committed == self.total: - print "all messages committed" - event.connection.close() - else: - self.current_batch = 0 - self.conn.declare_transaction(handler=self) - - def on_disconnected(self, event): - self.current_batch = 0 - - def run(self): - self.eventloop.run() - -try: - TxSend(10000, 10).run() -except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4b154cb/tutorial/tx_send_sync.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_send_sync.py b/tutorial/tx_send_sync.py deleted file mode 100755 index 0c50838..0000000 --- a/tutorial/tx_send_sync.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton.reactors import EventLoop -from proton.handlers import TransactionalClientHandler - -class TxSend(TransactionalClientHandler): - def __init__(self, messages, batch_size): - super(TxSend, self).__init__() - self.current_batch = 0 - self.confirmed = 0 - self.committed = 0 - self.total = messages - self.batch_size = batch_size - self.eventloop = EventLoop() - self.conn = self.eventloop.connect("localhost:5672", handler=self) - self.sender = self.conn.create_sender("examples") - self.conn.declare_transaction(handler=self) - self.transaction = None - - def on_transaction_declared(self, event): - self.transaction = event.transaction - self.send() - - def on_credit(self, event): - self.send() - - def send(self): - while self.transaction and self.current_batch < self.batch_size and self.sender.credit and self.committed < self.total: - msg = Message(body={'sequence':(self.committed+self.current_batch+1)}) - self.sender.send_msg(msg, transaction=self.transaction) - self.current_batch += 1 - - def on_accepted(self, event): - if event.sender == self.sender: - self.confirmed += 1 - if self.confirmed == self.batch_size: - self.transaction.commit() - self.transaction = None - self.confirmed = 0 - - def on_transaction_committed(self, event): - self.committed += self.current_batch - if self.committed == self.total: - print "all messages committed" - event.connection.close() - else: - self.current_batch = 0 - self.conn.declare_transaction(handler=self) - - def on_disconnected(self, event): - self.current_batch = 0 - - def run(self): - self.eventloop.run() - -try: - TxSend(10000, 10).run() -except KeyboardInterrupt: pass --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
