Author: kpvdr
Date: Mon Jun 25 19:11:55 2012
New Revision: 1353703
URL: http://svn.apache.org/viewvc?rev=1353703&view=rev
Log:
WIP - transactional consume path completed, still some testing to be done.
Modified:
qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp Mon Jun 25
19:11:55 2012
@@ -111,7 +111,6 @@ TxnBuffer::handleAsyncResult(const Async
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac =
boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
- tac->getTxnBuffer()->asyncLocalAbort();
std::cerr << "Transaction xid=\"" <<
tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() <<
": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" <<
std::endl;
tac->getTxnBuffer()->asyncLocalAbort();
@@ -153,7 +152,10 @@ TxnBuffer::asyncLocalCommit()
//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl
<< std::flush;
commit();
m_state = COMPLETE;
- //delete this; // TODO: ugly! Find a better way to handle the life
cycle of this class
+ delete this; // TODO: ugly! Find a better way to handle the life cycle
of this class
+ break;
+// case COMPLETE:
+//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMPLETE" << std::endl <<
std::flush;
break;
default: ;
//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state
<< std::endl << std::flush;
@@ -183,7 +185,7 @@ TxnBuffer::asyncLocalAbort()
//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl
<< std::flush;
rollback();
m_state = COMPLETE;
- //delete this; // TODO: ugly! Find a better way to handle the life
cycle of this class
+ delete this; // TODO: ugly! Find a better way to handle the life cycle
of this class
default: ;
//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state <<
std::endl << std::flush;
}
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
Mon Jun 25 19:11:55 2012
@@ -23,6 +23,7 @@
#include "DeliveryRecord.h"
+#include "MessageConsumer.h"
#include "SimpleMessage.h"
#include "SimpleQueue.h"
@@ -31,8 +32,10 @@ namespace storePerftools {
namespace asyncPerf {
DeliveryRecord::DeliveryRecord(const QueuedMessage& qm,
+ MessageConsumer& mc,
bool accepted) :
m_queuedMessage(qm),
+ m_msgConsumer(mc),
m_accepted(accepted),
m_ended(accepted)
{}
@@ -41,17 +44,29 @@ DeliveryRecord::~DeliveryRecord()
{}
bool
-DeliveryRecord::accept(qpid::broker::TxnHandle* txn)
+DeliveryRecord::accept()
{
if (!m_ended) {
- assert(m_queuedMessage.getQueue());
- m_queuedMessage.getQueue()->dequeue(*txn, m_queuedMessage);
+ m_queuedMessage.getQueue()->dequeue(m_queuedMessage);
m_accepted = true;
setEnded();
}
return isRedundant();
}
+/*
+bool
+DeliveryRecord::accept(qpid::broker::TxnHandle& txn)
+{
+ if (!m_ended) {
+ m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage);
+ m_accepted = true;
+ setEnded();
+ }
+ return isRedundant();
+}
+*/
+
bool
DeliveryRecord::isAccepted() const
{
@@ -78,5 +93,24 @@ DeliveryRecord::isRedundant() const
return m_ended;
}
+void
+DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn)
+{
+ m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage);
+}
+
+void
+DeliveryRecord::committed() const
+{
+//std::cout << "DeliveryRecord::committed()" << std::endl << std::flush;
+ m_msgConsumer.dequeueComplete();
+ //m_queuedMessage.getQueue()->dequeueCommitted(m_queuedMessage);
+}
+
+QueuedMessage
+DeliveryRecord::getQueuedMessage() const
+{
+ return m_queuedMessage;
+}
}}} // namespace tests::storePerftools::asyncPerf
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
Mon Jun 25 19:11:55 2012
@@ -35,18 +35,26 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class MessageConsumer;
+
class DeliveryRecord {
public:
DeliveryRecord(const QueuedMessage& qm,
+ MessageConsumer& mc,
bool accepted);
virtual ~DeliveryRecord();
- bool accept(qpid::broker::TxnHandle* txn);
+ bool accept();
+// bool accept(qpid::broker::TxnHandle& txn);
bool isAccepted() const;
bool setEnded();
bool isEnded() const;
bool isRedundant() const;
+ void dequeue(qpid::broker::TxnHandle& txn);
+ void committed() const;
+ QueuedMessage getQueuedMessage() const;
private:
QueuedMessage m_queuedMessage;
+ MessageConsumer& m_msgConsumer;
bool m_accepted : 1;
bool m_ended : 1;
};
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
Mon Jun 25 19:11:55 2012
@@ -23,8 +23,10 @@
#include "MessageConsumer.h"
+#include "DeliveryRecord.h"
#include "SimpleQueue.h"
#include "TestOptions.h"
+#include "TxnAccept.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/TxnBuffer.h"
@@ -48,26 +50,65 @@ MessageConsumer::MessageConsumer(const T
MessageConsumer::~MessageConsumer()
{}
+void
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
+{
+ // TODO: May need a lock?
+ m_unacked.push_back(dr);
+}
+
+void
+MessageConsumer::dequeueComplete()
+{
+//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush;
+ // TODO: May need a lock
+ //++m_numMsgs;
+}
+
void*
MessageConsumer::runConsumers()
{
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
- uint16_t txnCnt = 0U;
+ uint16_t opsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
if (useTxns) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
- uint32_t numMsgs = 0;
+ uint32_t numMsgs = 0UL;
while (numMsgs < m_perfTestParams.m_numMsgs) {
- if (m_queue->dispatch()) {
+ if (m_queue->dispatch(*this)) {
++numMsgs;
+ if (useTxns) {
+ // --- Transactional dequeue ---
+ if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
+ if (m_perfTestParams.m_durable) {
+ boost::shared_ptr<TxnAccept> ta(new
TxnAccept(m_unacked));
+ m_unacked.clear();
+ tb->enlist(ta);
+ tb->commitLocal(m_store);
+ if (numMsgs < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
+ } else {
+ tb->commit();
+ }
+ opsInTxnCnt = 0U;
+ }
+ } else {
+ // --- Non-transactional dequeue ---
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator
i = m_unacked.begin(); i != m_unacked.end(); ++i) {
+ (*i)->accept();
+ }
+ m_unacked.clear();
+ //++numMsgs;
+ }
} else {
::usleep(1000); // TODO - replace this poller with condition
variable
}
}
- if (txnCnt) {
+ if (opsInTxnCnt) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
} else {
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
Mon Jun 25 19:11:55 2012
@@ -25,6 +25,7 @@
#define tests_storePerftools_asyncPerf_MessageConsumer_h_
#include "boost/shared_ptr.hpp"
+#include <deque>
namespace qpid {
namespace asyncStore {
@@ -38,6 +39,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class DeliveryRecord;
class SimpleQueue;
class TestOptions;
@@ -49,6 +51,8 @@ public:
qpid::broker::AsyncResultQueue& arq,
boost::shared_ptr<SimpleQueue> queue);
virtual ~MessageConsumer();
+ void record(boost::shared_ptr<DeliveryRecord> dr);
+ void dequeueComplete();
void* runConsumers();
static void* startConsumers(void* ptr);
@@ -57,6 +61,7 @@ private:
qpid::asyncStore::AsyncStoreImpl* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
boost::shared_ptr<SimpleQueue> m_queue;
+ std::deque<boost::shared_ptr<DeliveryRecord> > m_unacked;
};
}}} // namespace tests::storePerftools::asyncPerf
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
Mon Jun 25 19:11:55 2012
@@ -56,7 +56,7 @@ void*
MessageProducer::runProducers()
{
const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
- uint16_t txnCnt = 0U;
+ uint16_t recsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
if (useTxns) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
@@ -67,27 +67,26 @@ MessageProducer::runProducers()
boost::shared_ptr<TxnPublish> op(new TxnPublish(msg));
op->deliverTo(m_queue);
tb->enlist(op);
- if (++txnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
+ if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
// TxnBuffer instance tb carries async state that
precludes it being re-used for the next
// transaction until the current commit cycle completes.
So use another instance. This
// instance should auto-delete when the async commit cycle
completes.
- if (numMsgs<m_perfTestParams.m_numMsgs) {
- //tb = boost::shared_ptr<qpid::broker::TxnBuffer>(new
qpid::broker::TxnBuffer(m_resultQueue));
+ if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
} else {
tb->commit();
}
- txnCnt = 0U;
+ recsInTxnCnt = 0U;
}
} else {
m_queue->deliver(msg);
}
}
- if (txnCnt) {
+ if (recsInTxnCnt) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
} else {
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
Mon Jun 25 19:11:55 2012
@@ -23,6 +23,8 @@
#include "SimpleQueue.h"
+#include "DeliveryRecord.h"
+#include "MessageConsumer.h"
#include "MessageDeque.h"
#include "SimpleMessage.h"
#include "QueueAsyncContext.h"
@@ -63,12 +65,7 @@ SimpleQueue::SimpleQueue(const std::stri
}
SimpleQueue::~SimpleQueue()
-{
-// m_store->flush(*this);
- // TODO: Make destroying the store a test parameter
-// m_store->destroy(*this);
-// m_store = 0;
-}
+{}
// static
void
@@ -170,16 +167,24 @@ SimpleQueue::deliver(boost::intrusive_pt
}
bool
-SimpleQueue::dispatch()
+SimpleQueue::dispatch(MessageConsumer& mc)
{
QueuedMessage qm;
if (m_messages->consume(qm)) {
- return dequeue(s_nullTxnHandle, qm);
+ boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc,
false));
+ mc.record(dr);
+ return true;
}
return false;
}
bool
+SimpleQueue::enqueue(QueuedMessage& qm)
+{
+ return enqueue(s_nullTxnHandle, qm);
+}
+
+bool
SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
@@ -195,6 +200,12 @@ SimpleQueue::enqueue(qpid::broker::TxnHa
}
bool
+SimpleQueue::dequeue(QueuedMessage& qm)
+{
+ return dequeue(s_nullTxnHandle, qm);
+}
+
+bool
SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
Mon Jun 25 19:11:55 2012
@@ -50,6 +50,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class MessageConsumer;
class Messages;
class SimpleMessage;
class QueueAsyncContext;
@@ -76,9 +77,11 @@ public:
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
- bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&)
but without Consumer param
+ bool dispatch(MessageConsumer& mc);
+ bool enqueue(QueuedMessage& qm);
bool enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
+ bool dequeue(QueuedMessage& qm);
bool dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
void process(boost::intrusive_ptr<SimpleMessage> msg);
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
Mon Jun 25 19:11:55 2012
@@ -23,11 +23,14 @@
#include "TxnAccept.h"
+#include "DeliveryRecord.h"
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-TxnAccept::TxnAccept()
+TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) :
+ m_ops(ops.begin(), ops.end())
{}
TxnAccept::~TxnAccept()
@@ -36,17 +39,42 @@ TxnAccept::~TxnAccept()
// --- Interface TxnOp ---
bool
-TxnAccept::prepare(qpid::broker::TxnHandle& /*th*/) throw()
+TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
{
+//std::cout << "TTT TxnAccept::prepare" << std::endl << std::flush;
+ try {
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i =
m_ops.begin(); i != m_ops.end(); ++i) {
+ (*i)->dequeue(th);
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "TxnAccept: Failed to prepare transaction: " << e.what()
<< std::endl;
+ } catch (...) {
+ std::cerr << "TxnAccept: Failed to prepare transaction: (unknown
error)" << std::endl;
+ }
return false;
}
void
TxnAccept::commit() throw()
-{}
+{
+//std::cout << "TTT TxnAccept::commit" << std::endl << std::flush;
+ try {
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator
i=m_ops.begin(); i!=m_ops.end(); ++i) {
+ (*i)->committed();
+ (*i)->setEnded();
+ }
+ //m_ops.clear();
+ } catch (const std::exception& e) {
+ std::cerr << "TxnAccept: Failed to commit transaction: " << e.what()
<< std::endl;
+ } catch(...) {
+ std::cerr << "TxnAccept: Failed to commit transaction: (unknown
error)" << std::endl;
+ }
+}
void
TxnAccept::rollback() throw()
-{}
+{
+//std::cout << "TTT TxnAccept::rollback" << std::endl << std::flush;
+}
}}} // namespace tests::storePerftools::asyncPerf
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
(original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
Mon Jun 25 19:11:55 2012
@@ -26,19 +26,26 @@
#include "qpid/broker/TxnOp.h"
+#include "boost/shared_ptr.hpp"
+#include <deque>
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class DeliveryRecord;
+
class TxnAccept: public qpid::broker::TxnOp {
public:
- TxnAccept();
+ TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops);
virtual ~TxnAccept();
// --- Interface TxnOp ---
bool prepare(qpid::broker::TxnHandle& th) throw();
void commit() throw();
void rollback() throw();
+private:
+ std::deque<boost::shared_ptr<DeliveryRecord> > m_ops;
};
}}} // namespace tests::storePerftools::asyncPerf
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp?rev=1353703&r1=1353702&r2=1353703&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
Mon Jun 25 19:11:55 2012
@@ -52,9 +52,9 @@ TxnPublish::prepare(qpid::broker::TxnHan
}
return true;
} catch (const std::exception& e) {
- std::cerr << "Failed to prepare transaction: " << e.what() <<
std::endl;
+ std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what()
<< std::endl;
} catch (...) {
- std::cerr << "Failed to prepare transaction: (unknown error)" <<
std::endl;
+ std::cerr << "TxnPublish: Failed to prepare transaction: (unknown
error)" << std::endl;
}
return false;
}
@@ -68,9 +68,9 @@ TxnPublish::commit() throw()
(*i)->commitEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "Failed to commit transaction: " << e.what() << std::endl;
+ std::cerr << "TxnPublish: Failed to commit transaction: " << e.what()
<< std::endl;
} catch (...) {
- std::cerr << "Failed to commit transaction: (unknown error)" <<
std::endl;
+ std::cerr << "TxnPublish: Failed to commit transaction: (unknown
error)" << std::endl;
}
}
@@ -83,9 +83,9 @@ TxnPublish::rollback() throw()
(*i)->abortEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "Failed to rollback transaction: " << e.what() <<
std::endl;
+ std::cerr << "TxnPublish: Failed to rollback transaction: " <<
e.what() << std::endl;
} catch (...) {
- std::cerr << "Failed to rollback transaction: (unknown error)" <<
std::endl;
+ std::cerr << "TxnPublish: Failed to rollback transaction: (unknown
error)" << std::endl;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]