Author: gsim
Date: Thu Nov 20 15:23:08 2014
New Revision: 1640755
URL: http://svn.apache.org/r1640755
Log:
QPID-4710: release messages on rollback
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1640755&r1=1640754&r2=1640755&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h Thu Nov 20 15:23:08 2014
@@ -127,7 +127,7 @@ const std::string RESOURCE_DELETED("amqp
const std::string PRECONDITION_FAILED("amqp:precondition-failed");
namespace transaction {
const std::string UNKNOWN_ID("amqp:transaction:unknown-id");
-const std::string ROLLBACK("amqp:transaction:rolback");
+const std::string ROLLBACK("amqp:transaction:rollback");
}
}
}} // namespace qpid::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.cpp?rev=1640755&r1=1640754&r2=1640755&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.cpp Thu Nov 20 15:23:08 2014
@@ -28,7 +28,7 @@ namespace broker {
TxDequeue::TxDequeue(QueueCursor m, boost::shared_ptr<Queue> q,
qpid::framing::SequenceNumber mId,
qpid::framing::SequenceNumber rId)
- : message(m), queue(q), messageId(mId), replicationId(rId) {}
+ : message(m), queue(q), messageId(mId), replicationId(rId),
releaseOnAbort(true), redeliveredOnAbort(true) {}
bool TxDequeue::prepare(TransactionContext* ctxt) throw()
{
@@ -55,7 +55,10 @@ void TxDequeue::commit() throw()
}
}
-void TxDequeue::rollback() throw() {}
+void TxDequeue::rollback() throw()
+{
+ if (releaseOnAbort) queue->release(message, redeliveredOnAbort);
+}
void TxDequeue::callObserver(const boost::shared_ptr<TransactionObserver>&
observer)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.h?rev=1640755&r1=1640754&r2=1640755&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxDequeue.h Thu Nov 20 15:23:08 2014
@@ -46,6 +46,8 @@ class TxDequeue: public TxOp
boost::shared_ptr<Queue> queue;
qpid::framing::SequenceNumber messageId;
qpid::framing::SequenceNumber replicationId;
+ bool releaseOnAbort;
+ bool redeliveredOnAbort;
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1640755&r1=1640754&r2=1640755&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Nov 20 15:23:08
2014
@@ -812,6 +812,7 @@ void Session::abort()
if (txn) {
txn->rollback();
txAborted();
+ txn = boost::intrusive_ptr<TxBuffer>();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]