Author: shuston
Date: Tue Nov 3 00:53:24 2009
New Revision: 832234
URL: http://svn.apache.org/viewvc?rev=832234&view=rev
Log:
When storing a message, set its new persistence Id. Don't throw an exception
when attempting to enqueue/dequeue a message without a AMQP transaction
associated. Mark queued/dequeued messages complete right after storing in
database; there's no delayed notify needed for this storage provider.
Fixes QPID-2170.
Modified:
qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=832234&r1=832233&r2=832234&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt Tue Nov 3 00:53:24 2009
@@ -73,7 +73,7 @@
ms-sql/Recordset.cpp
ms-sql/State.cpp
ms-sql/VariantHelper.cpp)
- target_link_libraries (mssql_store qpidcommon
${Boost_PROGRAM_OPTIONS_LIBRARY})
+ target_link_libraries (mssql_store qpidbroker qpidcommon
${Boost_PROGRAM_OPTIONS_LIBRARY})
install (TARGETS mssql_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=832234&r1=832233&r2=832234&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Tue Nov 3
00:53:24 2009
@@ -689,32 +689,49 @@
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
+ // If this enqueue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
- if (atxn == 0)
- throw qpid::broker::InvalidTransactionContextException();
- (void)initState(); // Ensure this thread is initialized
- try {
- atxn->begin();
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
+ }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ db = atxn->dbConn();
+ try {
+ atxn->begin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e);
+ }
}
- catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
- }
try {
if (msg->getPersistenceId() == 0) { // Message itself not yet saved
MessageRecordset rsMessages;
- rsMessages.open(atxn->dbConn(), TblMessage);
+ rsMessages.open(db, TblMessage);
rsMessages.add(msg);
}
MessageMapRecordset rsMap;
- rsMap.open(atxn->dbConn(), TblMessageMap);
+ rsMap.open(db, TblMessageMap);
rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
- atxn->commit();
+ if (atxn)
+ atxn->commit();
+ else
+ db->commitTransaction();
}
catch(_com_error &e) {
- atxn->abort();
+ if (atxn)
+ atxn->abort();
+ else
+ db->rollbackTransaction();
throw ADOException("Error queuing message", e);
- }
+ }
+ msg->enqueueComplete();
}
/**
@@ -731,32 +748,50 @@
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
+ // If this dequeue is in the context of a transaction, use the specified
+ // transaction to nest a new transaction for this operation. However, if
+ // this is not in the context of a transaction, then just use the thread's
+ // DatabaseConnection with a ADO transaction.
+ DatabaseConnection *db = 0;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
- if (atxn == 0)
- throw qpid::broker::InvalidTransactionContextException();
- (void)initState(); // Ensure this thread is initialized
- try {
- atxn->begin();
+ if (atxn == 0) {
+ db = initConnection();
+ db->beginTransaction();
}
- catch(_com_error &e) {
- throw ADOException("Error queuing message", e);
- }
+ else {
+ (void)initState(); // Ensure this thread is initialized
+ db = atxn->dbConn();
+ try {
+ atxn->begin();
+ }
+ catch(_com_error &e) {
+ throw ADOException("Error queuing message", e);
+ }
+ }
+
try {
MessageMapRecordset rsMap;
- rsMap.open(atxn->dbConn(), TblMessageMap);
+ rsMap.open(db, TblMessageMap);
bool more = rsMap.remove(msg->getPersistenceId(),
queue.getPersistenceId());
if (!more) {
MessageRecordset rsMessages;
- rsMessages.open(atxn->dbConn(), TblMessage);
+ rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
}
- atxn->commit();
+ if (atxn)
+ atxn->commit();
+ else
+ db->commitTransaction();
}
catch(_com_error &e) {
- atxn->abort();
+ if (atxn)
+ atxn->abort();
+ else
+ db->rollbackTransaction();
throw ADOException("Error dequeuing message", e);
}
+ msg->dequeueComplete();
}
std::auto_ptr<qpid::broker::TransactionContext>
@@ -795,6 +830,8 @@
void
MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
{
+ // The inner transactions used for the components of the TPC are done;
+ // nothing else to do but wait for the commit.
}
void
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp?rev=832234&r1=832233&r2=832234&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp Tue Nov 3
00:53:24 2009
@@ -42,6 +42,8 @@
rs->AddNew();
rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
rs->Update();
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ msg->setPersistenceId(id);
}
void
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]