Author: gsim
Date: Wed Oct 29 16:40:12 2014
New Revision: 1635195
URL: http://svn.apache.org/r1635195
Log:
Added a couple of new transactional examples
Added:
qpid/proton/branches/examples/tutorial/tx_recv.py (with props)
qpid/proton/branches/examples/tutorial/tx_send.py (with props)
qpid/proton/branches/examples/tutorial/tx_send_sync.py (with props)
Modified:
qpid/proton/branches/examples/tutorial/proton_events.py
qpid/proton/branches/examples/tutorial/server_tx.py
Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1635195&r1=1635194&r2=1635195&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Oct 29 16:40:12
2014
@@ -639,6 +639,10 @@ class ClientHandler(ClientEndpointHandle
IncomingMessageHandler.on_delivery(self, event)
OutgoingMessageHandler.on_delivery(self, event)
+ def on_settled(self, event):
+ IncomingMessageHandler.on_settled(self, event)
+ OutgoingMessageHandler.on_settled(self, event)
+
def delivery_tags():
count = 1
while True:
@@ -659,7 +663,7 @@ def send_msg(sender, msg, tag=None, hand
def _send_msg(self, msg, tag=None, handler=None, transaction=None):
return send_msg(self, msg, tag, handler, transaction)
-class TxHandler(OutgoingMessageHandler):
+class TransactionHandler(OutgoingMessageHandler):
def on_settled(self, event):
if hasattr(event.delivery, "transaction"):
event.transaction = event.delivery.transaction
@@ -680,6 +684,17 @@ class TxHandler(OutgoingMessageHandler):
delivery.update(0x34)
delivery.settle()
+class TransactionalClientHandler(ClientEndpointHandler, TransactionHandler,
IncomingMessageHandler):
+ def __init__(self):
+ super(TransactionalClientHandler, self).__init__()
+
+ def on_delivery(self, event):
+ IncomingMessageHandler.on_delivery(self, event)
+ TransactionHandler.on_delivery(self, event)
+
+ def on_settled(self, event):
+ IncomingMessageHandler.on_settled(self, event)
+ TransactionHandler.on_settled(self, event)
class Transaction(object):
def __init__(self, txn_ctrl, handler):
Modified: qpid/proton/branches/examples/tutorial/server_tx.py
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server_tx.py?rev=1635195&r1=1635194&r2=1635195&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/server_tx.py (original)
+++ qpid/proton/branches/examples/tutorial/server_tx.py Wed Oct 29 16:40:12 2014
@@ -19,9 +19,9 @@
#
from proton import Message
-from proton_events import EventLoop, ClientHandler, TxHandler
+from proton_events import EventLoop, ClientHandler, TransactionHandler
-class TxRequest(TxHandler):
+class TxRequest(TransactionHandler):
def __init__(self, response, sender, request_delivery, conn):
self.response = response
self.sender = sender
Added: qpid/proton/branches/examples/tutorial/tx_recv.py
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/tx_recv.py?rev=1635195&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/tx_recv.py (added)
+++ qpid/proton/branches/examples/tutorial/tx_recv.py Wed Oct 29 16:40:12 2014
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import proton_events
+
+class TxRecv(proton_events.TransactionalClientHandler):
+ def __init__(self, batch_size):
+ self.current_batch = 0
+ self.batch_size = batch_size
+ self.event_loop = proton_events.EventLoop(self)
+ self.conn = self.event_loop.connect("localhost:5672")
+ self.receiver = self.conn.create_receiver("examples")
+ self.conn.declare_transaction(handler=self)
+ self.transaction = None
+
+ def on_message(self, event):
+ print event.message.body
+ self.accept(event.delivery, self.transaction)
+ self.current_batch += 1
+ if self.current_batch == self.batch_size:
+ self.transaction.commit()
+ self.transaction = None
+
+ def on_transaction_declared(self, event):
+ self.receiver.flow(self.batch_size)
+ self.transaction = event.transaction
+
+ def auto_accept(self): return False
+
+ def on_transaction_committed(self, event):
+ self.current_batch = 0
+ self.conn.declare_transaction(handler=self)
+
+ def on_disconnected(self, event):
+ self.current_batch = 0
+
+ def run(self):
+ self.event_loop.run()
+
+try:
+ TxRecv(10).run()
+except KeyboardInterrupt: pass
+
+
+
Propchange: qpid/proton/branches/examples/tutorial/tx_recv.py
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/branches/examples/tutorial/tx_send.py
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/tx_send.py?rev=1635195&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/tx_send.py (added)
+++ qpid/proton/branches/examples/tutorial/tx_send.py Wed Oct 29 16:40:12 2014
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+import proton_events
+
+class TxSend(proton_events.TransactionalClientHandler):
+ def __init__(self, messages, batch_size):
+ self.current_batch = 0
+ self.committed = 0
+ self.total = messages
+ self.batch_size = batch_size
+ self.conn = proton_events.connect("localhost:5672", handler=self)
+ self.sender = self.conn.create_sender("examples")
+ self.conn.declare_transaction(handler=self)
+ self.transaction = None
+
+ def on_transaction_declared(self, event):
+ self.transaction = event.transaction
+ self.send()
+
+ def on_credit(self, event):
+ self.send()
+
+ def send(self):
+ while self.transaction and self.sender.credit and self.committed <
self.total:
+ msg =
Message(body={'sequence':(self.committed+self.current_batch+1)})
+ self.sender.send_msg(msg, transaction=self.transaction)
+ self.current_batch += 1
+ if self.current_batch == self.batch_size:
+ self.transaction.commit()
+ self.transaction = None
+
+ def on_accepted(self, event):
+ if event.sender == self.sender:
+ self.confirmed += 1
+
+ def on_transaction_committed(self, event):
+ self.committed += self.current_batch
+ if self.committed == self.total:
+ print "all messages committed"
+ event.connection.close()
+ else:
+ self.current_batch = 0
+ self.conn.declare_transaction(handler=self)
+
+ def on_disconnected(self, event):
+ self.current_batch = 0
+
+ def run(self):
+ proton_events.run()
+
+try:
+ #TxSend(10000, 10).run()
+ TxSend(9, 3).run()
+except KeyboardInterrupt: pass
Propchange: qpid/proton/branches/examples/tutorial/tx_send.py
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/branches/examples/tutorial/tx_send_sync.py
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/tx_send_sync.py?rev=1635195&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/tx_send_sync.py (added)
+++ qpid/proton/branches/examples/tutorial/tx_send_sync.py Wed Oct 29 16:40:12
2014
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+import proton_events
+
+class TxSend(proton_events.TransactionalClientHandler):
+ def __init__(self, messages, batch_size):
+ self.current_batch = 0
+ self.confirmed = 0
+ self.committed = 0
+ self.total = messages
+ self.batch_size = batch_size
+ self.conn = proton_events.connect("localhost:5672", handler=self)
+ self.sender = self.conn.create_sender("examples")
+ self.conn.declare_transaction(handler=self)
+ self.transaction = None
+
+ def on_transaction_declared(self, event):
+ self.transaction = event.transaction
+ self.send()
+
+ def on_credit(self, event):
+ self.send()
+
+ def send(self):
+ while self.transaction and self.current_batch < self.batch_size and
self.sender.credit and self.committed < self.total:
+ msg =
Message(body={'sequence':(self.committed+self.current_batch+1)})
+ self.sender.send_msg(msg, transaction=self.transaction)
+ self.current_batch += 1
+
+ def on_accepted(self, event):
+ if event.sender == self.sender:
+ self.confirmed += 1
+ if self.confirmed == self.batch_size:
+ self.transaction.commit()
+ self.transaction = None
+ self.confirmed = 0
+
+ def on_transaction_committed(self, event):
+ self.committed += self.current_batch
+ if self.committed == self.total:
+ print "all messages committed"
+ event.connection.close()
+ else:
+ self.current_batch = 0
+ self.conn.declare_transaction(handler=self)
+
+ def on_disconnected(self, event):
+ self.current_batch = 0
+
+ def run(self):
+ proton_events.run()
+
+try:
+ TxSend(10000, 10).run()
+except KeyboardInterrupt: pass
Propchange: qpid/proton/branches/examples/tutorial/tx_send_sync.py
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]