Repository: qpid-proton Updated Branches: refs/heads/examples 4706c2b7c -> a37304f96
Support two different settlement modes with transactions; added interactive script for testing/troubleshooting Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a37304f9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a37304f9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a37304f9 Branch: refs/heads/examples Commit: a37304f9684d3cbe225d57ebc76542350e9efea8 Parents: 4706c2b Author: Gordon Sim <[email protected]> Authored: Wed Nov 26 11:47:50 2014 +0000 Committer: Gordon Sim <[email protected]> Committed: Wed Nov 26 11:47:50 2014 +0000 ---------------------------------------------------------------------- tutorial/proton_reactors.py | 33 ++++++++++++-- tutorial/tx_recv_interactive.py | 83 ++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a37304f9/tutorial/proton_reactors.py ---------------------------------------------------------------------- diff --git a/tutorial/proton_reactors.py b/tutorial/proton_reactors.py index 28f287d..f431b24 100644 --- a/tutorial/proton_reactors.py +++ b/tutorial/proton_reactors.py @@ -417,13 +417,15 @@ def _send_msg(self, msg, tag=None, handler=None, transaction=None): class Transaction(object): - def __init__(self, txn_ctrl, handler): + def __init__(self, txn_ctrl, handler, settle_before_discharge=False): self.txn_ctrl = txn_ctrl self.handler = handler self.id = None self._declare = None self._discharge = None self.failed = False + self._pending = [] + self.settle_before_discharge = settle_before_discharge self.declare() def commit(self): @@ -444,6 +446,27 @@ class Transaction(object): delivery.transaction = self return delivery + def accept(self, delivery): + self.update(delivery, PN_ACCEPTED) + if self.settle_before_discharge: + delivery.settle() + else: + self._pending.append(delivery) + + def update(self, delivery, state=None): + if state: + delivery.local.data = [self.id, Described(ulong(state), [])] + delivery.update(0x34) + + def _release_pending(self): + for d in self._pending: + d.update(Delivery.RELEASED) + d.settle() + self._clear_pending() + + def _clear_pending(self): + self._pending = [] + def handle_outcome(self, event): if event.delivery == self._declare: if event.delivery.remote.data: @@ -458,12 +481,14 @@ class Transaction(object): if event.delivery.remote_state == Delivery.REJECTED: if not self.failed: self.handler.on_transaction_commit_failed(event) + self._release_pending() # make this optional? else: if self.failed: self.handler.on_transaction_aborted(event) + self._release_pending() else: self.handler.on_transaction_committed(event) - + self._clear_pending() class LinkOption(object): def apply(self, link): pass @@ -554,12 +579,12 @@ class MessagingContext(object): def create_session(self): return MessageContext(conn=None, ssn=self._new_ssn()) - def declare_transaction(self, handler=None): + def declare_transaction(self, handler=None, settle_before_discharge=False): if not self.txn_ctrl: self.txn_ctrl = self.create_sender(None, name="txn-ctrl") self.txn_ctrl.target.type = Terminus.COORDINATOR self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) - return Transaction(self.txn_ctrl, handler) + return Transaction(self.txn_ctrl, handler, settle_before_discharge) def close(self): if self.ssn: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a37304f9/tutorial/tx_recv_interactive.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_recv_interactive.py b/tutorial/tx_recv_interactive.py new file mode 100755 index 0000000..4e36534 --- /dev/null +++ b/tutorial/tx_recv_interactive.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import threading +from proton_reactors import ApplicationEvent, EventLoop +from proton_handlers import TransactionalClientHandler + +class TxRecv(TransactionalClientHandler): + def __init__(self): + super(TxRecv, self).__init__(prefetch=0) + + def on_start(self, event): + self.context = event.reactor.connect("localhost:5672") + self.receiver = self.context.create_receiver("examples") + #self.context.declare_transaction(handler=self, settle_before_discharge=False) + self.context.declare_transaction(handler=self, settle_before_discharge=True) + self.transaction = None + + def on_message(self, event): + print event.message.body + self.transaction.accept(event.delivery) + + def on_transaction_declared(self, event): + self.transaction = event.transaction + print "transaction declared" + + def on_transaction_committed(self, event): + print "transaction committed" + self.context.declare_transaction(handler=self) + + def on_transaction_aborted(self, event): + print "transaction aborted" + self.context.declare_transaction(handler=self) + + def on_commit(self, event): + self.transaction.commit() + + def on_abort(self, event): + self.transaction.abort() + + def on_fetch(self, event): + self.receiver.flow(1) + + def on_quit(self, event): + c = self.receiver.connection + self.receiver.close() + c.close() + +try: + reactor = EventLoop(TxRecv()) + events = reactor.get_event_trigger() + thread = threading.Thread(target=reactor.run) + thread.daemon=True + thread.start() + + print "Enter 'fetch', 'commit' or 'abort'" + while True: + line = sys.stdin.readline() + if line: + events.trigger(ApplicationEvent(line.strip())) + else: + break +except KeyboardInterrupt: pass + + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
