Author: shuston
Date: Tue Nov  3 00:53:24 2009
New Revision: 832234

URL: http://svn.apache.org/viewvc?rev=832234&view=rev
Log:
When storing a message, set its new persistence Id. Don't throw an exception 
when attempting to enqueue/dequeue a message without a AMQP transaction 
associated. Mark queued/dequeued messages complete right after storing in 
database; there's no delayed notify needed for this storage provider.
Fixes QPID-2170.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=832234&r1=832233&r2=832234&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/CMakeLists.txt Tue Nov  3 00:53:24 2009
@@ -73,7 +73,7 @@
                ms-sql/Recordset.cpp
                ms-sql/State.cpp
                ms-sql/VariantHelper.cpp)
-  target_link_libraries (mssql_store qpidcommon 
${Boost_PROGRAM_OPTIONS_LIBRARY})
+  target_link_libraries (mssql_store qpidbroker qpidcommon 
${Boost_PROGRAM_OPTIONS_LIBRARY})
   install (TARGETS mssql_store # RUNTIME
            DESTINATION ${QPIDD_MODULE_DIR}
            COMPONENT ${QPID_COMPONENT_BROKER})

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=832234&r1=832233&r2=832234&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Tue Nov  3 
00:53:24 2009
@@ -689,32 +689,49 @@
                        const boost::intrusive_ptr<PersistableMessage>& msg,
                        const PersistableQueue& queue)
 {
+    // If this enqueue is in the context of a transaction, use the specified
+    // transaction to nest a new transaction for this operation. However, if
+    // this is not in the context of a transaction, then just use the thread's
+    // DatabaseConnection with a ADO transaction.
+    DatabaseConnection *db = 0;
     AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
-    if (atxn == 0)
-        throw qpid::broker::InvalidTransactionContextException();
-    (void)initState();     // Ensure this thread is initialized
-    try {
-        atxn->begin();
+    if (atxn == 0) {
+        db = initConnection();
+        db->beginTransaction();
+    }
+    else {
+        (void)initState();     // Ensure this thread is initialized
+        db = atxn->dbConn();
+        try {
+          atxn->begin();
+        }
+        catch(_com_error &e) {
+            throw ADOException("Error queuing message", e);
+        }
     }
-    catch(_com_error &e) {
-        throw ADOException("Error queuing message", e);
-    }  
 
     try {
         if (msg->getPersistenceId() == 0) {    // Message itself not yet saved
             MessageRecordset rsMessages;
-            rsMessages.open(atxn->dbConn(), TblMessage);
+            rsMessages.open(db, TblMessage);
             rsMessages.add(msg);
         }
         MessageMapRecordset rsMap;
-        rsMap.open(atxn->dbConn(), TblMessageMap);
+        rsMap.open(db, TblMessageMap);
         rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
-        atxn->commit();
+        if (atxn)
+            atxn->commit();
+        else
+            db->commitTransaction();
     }
     catch(_com_error &e) {
-        atxn->abort();
+        if (atxn)
+            atxn->abort();
+        else
+            db->rollbackTransaction();
         throw ADOException("Error queuing message", e);
-    }  
+    }
+    msg->enqueueComplete();
 }
 
 /**
@@ -731,32 +748,50 @@
                        const boost::intrusive_ptr<PersistableMessage>& msg,
                        const PersistableQueue& queue)
 {
+    // If this dequeue is in the context of a transaction, use the specified
+    // transaction to nest a new transaction for this operation. However, if
+    // this is not in the context of a transaction, then just use the thread's
+    // DatabaseConnection with a ADO transaction.
+    DatabaseConnection *db = 0;
     AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
-    if (atxn == 0)
-        throw qpid::broker::InvalidTransactionContextException();
-    (void)initState();     // Ensure this thread is initialized
-    try {
-        atxn->begin();
+    if (atxn == 0) {
+        db = initConnection();
+        db->beginTransaction();
     }
-    catch(_com_error &e) {
-        throw ADOException("Error queuing message", e);
-    }  
+    else {
+        (void)initState();     // Ensure this thread is initialized
+        db = atxn->dbConn();
+        try {
+            atxn->begin();
+        }
+        catch(_com_error &e) {
+            throw ADOException("Error queuing message", e);
+        }
+    }
+
     try {
         MessageMapRecordset rsMap;
-        rsMap.open(atxn->dbConn(), TblMessageMap);
+        rsMap.open(db, TblMessageMap);
         bool more = rsMap.remove(msg->getPersistenceId(),
                                  queue.getPersistenceId());
         if (!more) {
             MessageRecordset rsMessages;
-            rsMessages.open(atxn->dbConn(), TblMessage);
+            rsMessages.open(db, TblMessage);
             rsMessages.remove(msg);
         }
-        atxn->commit();
+        if (atxn)
+            atxn->commit();
+        else
+            db->commitTransaction();
     }
     catch(_com_error &e) {
-        atxn->abort();
+        if (atxn)
+            atxn->abort();
+        else
+            db->rollbackTransaction();
         throw ADOException("Error dequeuing message", e);
     }  
+    msg->dequeueComplete();
 }
 
 std::auto_ptr<qpid::broker::TransactionContext>
@@ -795,6 +830,8 @@
 void
 MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
 {
+    // The inner transactions used for the components of the TPC are done;
+    // nothing else to do but wait for the commit.
 }
 
 void

Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp?rev=832234&r1=832233&r2=832234&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp Tue Nov  3 
00:53:24 2009
@@ -42,6 +42,8 @@
     rs->AddNew();
     rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
     rs->Update();
+    uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+    msg->setPersistenceId(id);
 }
 
 void



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to