Author: kpvdr
Date: Tue Jul 19 19:57:06 2011
New Revision: 1148503

URL: http://svn.apache.org/viewvc?rev=1148503&view=rev
Log:
QPID-702656 Patch from Gordon Sim plus tests which detect the condition being 
solved. Added a make check-long target to the Makefile in the cpp dir to make 
it easier to run the long tests.

Added:
    qpid/trunk/qpid/cpp/src/tests/federation_sys.py   (with props)
    qpid/trunk/qpid/cpp/src/tests/run_federation_sys_tests   (with props)
    qpid/trunk/qpid/cpp/src/tests/run_long_federation_sys_tests
Modified:
    qpid/trunk/qpid/cpp/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
    qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py

Modified: qpid/trunk/qpid/cpp/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/Makefile.am?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/Makefile.am Tue Jul 19 19:57:06 2011
@@ -33,3 +33,7 @@ SUBDIRS = managementgen etc src docs/api
 # Update libtool, if needed.
 libtool: $(LIBTOOL_DEPS)
        $(SHELL) ./config.status --recheck
+
+check-long: all
+       $(MAKE) -C src/tests check-long
+       
\ No newline at end of file

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jul 19 19:57:06 2011
@@ -434,8 +434,9 @@ Manageable::status_t Broker::ManagementM
         _qmf::ArgsBrokerConnect& hp=
             dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
 
-        QPID_LOG (debug, "Broker::connect()");
         string transport = hp.i_transport.empty() ? TCP_TRANSPORT : 
hp.i_transport;
+        QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port 
<< "; transport=" << transport <<
+                        "; durable=" << (hp.i_durable?"T":"F") << "; 
authMech=\"" << hp.i_authMechanism << "\"");
         if (!getProtocolFactory(transport)) {
             QPID_LOG(error, "Transport '" << transport << "' not supported");
             return  Manageable::STATUS_NOT_IMPLEMENTED;
@@ -452,9 +453,9 @@ Manageable::status_t Broker::ManagementM
         _qmf::ArgsBrokerQueueMoveMessages& moveArgs=
             dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
         QPID_LOG (debug, "Broker::queueMoveMessages()");
-       if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, 
moveArgs.i_qty))
+        if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, 
moveArgs.i_qty))
             status = Manageable::STATUS_OK;
-       else
+        else
             return Manageable::STATUS_PARAMETER_INVALID;
         break;
       }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Jul 19 19:57:06 
2011
@@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::Fr
 {
     id = deliveryId;
     if (msg.payload->getRedelivered()){
-        
msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
+        msg.payload->setRedelivered();
     }
     msg.payload->adjustTtl();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Jul 19 19:57:06 2011
@@ -58,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable
 
         if (parent->sequence){
             parent->sequenceNo++;
-            
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+            
msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
         }
         if (parent->ive) {
             parent->lastMsg =  &( msg.getMessage());
@@ -390,7 +390,7 @@ bool Exchange::MatchQueue::operator()(Ex
 }
 
 void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
-    msg->getProperties<DeliveryProperties>()->setExchange(getName());
+    msg->setExchange(getName());
 }
 
 bool Exchange::routeWithAlternate(Deliverable& msg)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Jul 19 19:57:06 2011
@@ -30,7 +30,9 @@
 #include "qpid/framing/SendContent.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
+#include <boost/pointer_cast.hpp>
 
 #include <time.h>
 
@@ -51,18 +53,9 @@ Message::Message(const framing::Sequence
     frames(id), persistenceId(0), redelivered(false), loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
     expiration(FAR_FUTURE), dequeueCallback(0),
-    inCallback(false), requiredCredit(0), isManagementMessage(false)
+    inCallback(false), requiredCredit(0), isManagementMessage(false), 
copyHeaderOnWrite(false)
 {}
 
-Message::Message(const Message& original) :
-    PersistableMessage(), frames(original.frames), persistenceId(0), 
redelivered(false), loaded(false),
-    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
-    expiration(original.expiration), dequeueCallback(0),
-    inCallback(false), requiredCredit(0)
-{
-    setExpiryPolicy(original.expiryPolicy);
-}
-
 Message::~Message() {}
 
 void Message::forcePersistent()
@@ -288,6 +281,9 @@ void Message::sendHeader(framing::FrameH
     sys::Mutex::ScopedLock l(lock);
     Relay f(out);
     frames.map_if(f, TypeFilter<HEADER_BODY>());
+    //as frame (and pointer to body) has now been passed to handler,
+    //subsequent modifications should use a copy
+    copyHeaderOnWrite = true;
 }
 
 // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -342,11 +338,30 @@ bool Message::isExcluded(const std::vect
     return false;
 }
 
+class CloneHeaderBody
+{
+public:
+    void operator()(AMQFrame& f)
+    {
+        f.cloneBody();
+    }
+};
+
+AMQHeaderBody* Message::getHeaderBody()
+{
+    if (copyHeaderOnWrite) {
+        CloneHeaderBody f;
+        frames.map_if(f, TypeFilter<HEADER_BODY>());
+        copyHeaderOnWrite = false;
+    }
+    return frames.getHeaders();
+}
+
 void Message::addTraceId(const std::string& id)
 {
     sys::Mutex::ScopedLock l(lock);
     if (isA<MessageTransferBody>()) {
-        FieldTable& headers = 
getProperties<MessageProperties>()->getApplicationHeaders();
+        FieldTable& headers = 
getModifiableProperties<MessageProperties>()->getApplicationHeaders();
         std::string trace = headers.getAsString(X_QPID_TRACE);
         if (trace.empty()) {
             headers.setString(X_QPID_TRACE, id);
@@ -360,7 +375,8 @@ void Message::addTraceId(const std::stri
 
 void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
 {
-    DeliveryProperties* props = getProperties<DeliveryProperties>();
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
     if (props->getTtl()) {
         // AMQP requires setting the expiration property to be posix
         // time_t in seconds. TTL is in milliseconds
@@ -382,9 +398,9 @@ void Message::setTimestamp(const boost::
 
 void Message::adjustTtl()
 {
-    DeliveryProperties* props = getProperties<DeliveryProperties>();
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
     if (props->getTtl()) {
-        sys::Mutex::ScopedLock l(lock);
         if (expiration < FAR_FUTURE) {
             sys::AbsTime current(
                 expiryPolicy ? expiryPolicy->getCurrentTime() : 
sys::AbsTime::now());
@@ -395,6 +411,42 @@ void Message::adjustTtl()
     }
 }
 
+void Message::setRedelivered()
+{
+    sys::Mutex::ScopedLock l(lock);
+    
getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+}
+
+void Message::insertCustomProperty(const std::string& key, int64_t value)
+{
+    sys::Mutex::ScopedLock l(lock);
+    
getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+}
+
+void Message::insertCustomProperty(const std::string& key, const std::string& 
value)
+{
+    sys::Mutex::ScopedLock l(lock);
+    
getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+}
+
+void Message::removeCustomProperty(const std::string& key)
+{
+    sys::Mutex::ScopedLock l(lock);
+    
getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+}
+
+void Message::setExchange(const std::string& exchange)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+}
+
+void Message::clearApplicationHeadersFlag()
+{
+    sys::Mutex::ScopedLock l(lock);
+    
getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
+}
+
 void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
     expiryPolicy = e;
 }
@@ -442,11 +494,6 @@ uint8_t Message::getPriority() const {
     return getAdapter().getPriority(frames);
 }
 
-framing::FieldTable& Message::getOrInsertHeaders()
-{
-    return getProperties<MessageProperties>()->getApplicationHeaders();
-}
-
 bool Message::getIsManagementMessage() const { return isManagementMessage; }
 void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Jul 19 19:57:06 2011
@@ -29,13 +29,17 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/shared_ptr.hpp>
+#include <memory>
 #include <string>
 #include <vector>
 
 namespace qpid {
 
 namespace framing {
+class AMQBody;
+class AMQHeaderBody;
 class FieldTable;
 class SequenceNumber;
 }
@@ -53,7 +57,6 @@ public:
     typedef boost::function<void (const boost::intrusive_ptr<Message>&)> 
MessageCallback;
 
     QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = 
framing::SequenceNumber());
-    QPID_BROKER_EXTERN Message(const Message&);
     QPID_BROKER_EXTERN ~Message();
 
     uint64_t getPersistenceId() const { return persistenceId; }
@@ -75,7 +78,6 @@ public:
     bool isImmediate() const;
     QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() 
const;
     QPID_BROKER_EXTERN std::string getAppId() const;
-    framing::FieldTable& getOrInsertHeaders();
     QPID_BROKER_EXTERN bool isPersistent() const;
     bool requiresAccept();
 
@@ -85,18 +87,19 @@ public:
     sys::AbsTime getExpiration() const { return expiration; }
     void setExpiration(sys::AbsTime exp) { expiration = exp; }
     void adjustTtl();
+    void setRedelivered();
+    void insertCustomProperty(const std::string& key, int64_t value);
+    void insertCustomProperty(const std::string& key, const std::string& 
value);
+    void removeCustomProperty(const std::string& key);
+    void setExchange(const std::string&);
+    void clearApplicationHeadersFlag();
 
     framing::FrameSet& getFrames() { return frames; }
     const framing::FrameSet& getFrames() const { return frames; }
 
-    template <class T> T* getProperties() {
-        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>(true);
-    }
-
     template <class T> const T* getProperties() const {
         const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>(true);
+        return p->get<T>();
     }
 
     template <class T> const T* hasProperties() const {
@@ -156,9 +159,8 @@ public:
     bool isExcluded(const std::vector<std::string>& excludes) const;
     void addTraceId(const std::string& id);
 
-       void forcePersistent();
-       bool isForcedPersistent();
-
+    void forcePersistent();
+    bool isForcedPersistent();
 
     /** Call cb when dequeue is complete, may call immediately. Holds cb by 
reference. */
     void setDequeueCompleteCallback(MessageCallback& cb);
@@ -178,7 +180,7 @@ public:
     bool redelivered;
     bool loaded;
     bool staged;
-       bool forcePersistentPolicy; // used to force message as durable, via a 
broker policy
+    bool forcePersistentPolicy; // used to force message as durable, via a 
broker policy
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
     qpid::sys::AbsTime expiration;
@@ -194,6 +196,15 @@ public:
 
     uint32_t requiredCredit;
     bool isManagementMessage;
+      mutable bool copyHeaderOnWrite;
+
+    /**
+     * Expects lock to be held
+     */
+    template <class T> T* getModifiableProperties() {
+        return getHeaderBody()->get<T>(true);
+    }
+    qpid::framing::AMQHeaderBody* getHeaderBody();
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jul 19 19:57:06 2011
@@ -525,7 +525,7 @@ void Queue::push(boost::intrusive_ptr<Me
     {
         Mutex::ScopedLock locker(messageLock);
         QueuedMessage qm(this, msg, ++sequence);
-        if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, 
sequence);
+        if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
 
         dequeueRequired = messages->push(qm, removed);
         listeners.populate(copy);
@@ -627,11 +627,6 @@ bool Queue::enqueue(TransactionContext* 
     }
 
     if (traceId.size()) {
-        //copy on write: take deep copy of message before modifying it
-        //as the frames may already be available for delivery on other
-        //threads
-        boost::intrusive_ptr<Message> copy(new Message(*msg));
-        msg = copy;
         msg->addTraceId(traceId);
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Jul 19 19:57:06 
2011
@@ -318,22 +318,22 @@ class MessageUpdater {
         lastPos = message.position;
 
         // if the ttl > 0, we need to send the calculated expiration time to 
the updatee
-        if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 
0) {
+        const DeliveryProperties* dprops =
+            message.payload->getProperties<DeliveryProperties>();
+        if (dprops && dprops->getTtl() > 0) {
             bool hadMessageProps =
                 message.payload->hasProperties<framing::MessageProperties>();
-            framing::MessageProperties* mprops =
+            const framing::MessageProperties* mprops =
                 message.payload->getProperties<framing::MessageProperties>();
             bool hadApplicationHeaders = mprops->hasApplicationHeaders();
-            FieldTable& applicationHeaders = mprops->getApplicationHeaders();
-            applicationHeaders.setInt64(
-                UpdateClient::X_QPID_EXPIRATION,
-                sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+            
message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
+                            sys::Duration(sys::EPOCH, 
message.payload->getExpiration()));
             // If message properties or application headers didn't exist
             // prior to us adding data, we want to remove them on the other 
side.
             if (!hadMessageProps)
-                
applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+                
message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
             else if (!hadApplicationHeaders)
-                applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0);
+                
message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
         }
 
         // We can't send a broker::Message via the normal client API,

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp Tue Jul 19 19:57:06 
2011
@@ -49,18 +49,18 @@ void UpdateExchange::setProperties(const
 
     // Copy expiration from x-property if present.
     if (msg->hasProperties<MessageProperties>()) {
-        MessageProperties* mprops = msg->getProperties<MessageProperties>();
+        const MessageProperties* mprops = 
msg->getProperties<MessageProperties>();
         if (mprops->hasApplicationHeaders()) {
-            FieldTable& headers = mprops->getApplicationHeaders();
+            const FieldTable& headers = mprops->getApplicationHeaders();
             if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
                 msg->setExpiration(
                     sys::AbsTime(sys::EPOCH, 
headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
-                headers.erase(UpdateClient::X_QPID_EXPIRATION);
+                msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
                 // Erase props/headers that were added by the UpdateClient
                 if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
                     msg->eraseProperties<MessageProperties>();
                 else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
-                    mprops->clearApplicationHeadersFlag();
+                    msg->clearApplicationHeadersFlag();
             }
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Jul 19 19:57:06 2011
@@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer)
     return true;
 }
 
+void AMQFrame::cloneBody()
+{
+    body = body->clone();
+}
+
 std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
 {
     return

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Jul 19 19:57:06 2011
@@ -59,6 +59,11 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame 
         return boost::polymorphic_downcast<const T*>(getBody());
     }
 
+    /**
+     * Take a deep copy of the body currently referenced
+     */
+    void cloneBody();
+
     QPID_COMMON_EXTERN void encode(Buffer& buffer) const; 
     QPID_COMMON_EXTERN bool decode(Buffer& buffer); 
     QPID_COMMON_EXTERN uint32_t encodedSize() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Jul 19 
19:57:06 2011
@@ -614,7 +614,7 @@ void ManagementAgent::sendBufferLH(const
     props->setAppId("qmf2");
 
     for (i = headers.begin(); i != headers.end(); ++i) {
-        msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+        msg->insertCustomProperty(i->first, i->second.asString());
     }
 
     DeliveryProperties* dp =

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Tue 
Jul 19 19:57:06 2011
@@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDe
 void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& 
enqueued)
 {
     boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), 
enqueued.payload));
-    FieldTable& headers = 
msg->getProperties<MessageProperties>()->getApplicationHeaders();
-    headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
-    headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
-    headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
+    msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, 
enqueued.queue->getName());
+    msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE);
+    msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position);
     route(msg);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Tue Jul 19 
19:57:06 2011
@@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueE
         } else {
             queue->setPosition(seqno1);  
 
-            FieldTable& headers = 
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
-            headers.erase(REPLICATION_TARGET_QUEUE);
-            headers.erase(REPLICATION_EVENT_SEQNO);
-            headers.erase(REPLICATION_EVENT_TYPE);
-            headers.erase(QUEUE_MESSAGE_POSITION);
+            msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
+            msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
+            msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
+            msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
             msg.deliverTo(queue);
             QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
             if (mgmtExchange != 0) {

Modified: qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Tue Jul 19 19:57:06 2011
@@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption)
     TopicExchange topic ("topic1", false, args);
 
     intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
-    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a",
 "abc");
+    msg1->insertCustomProperty("a", "abc");
     DeliverableMessage dmsg1(msg1);
 
     FieldTable args2;

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jul 19 19:57:06 2011
@@ -298,7 +298,7 @@ TESTS_ENVIRONMENT = \
     $(srcdir)/run_test 
 
 system_tests = qpid-client-test quick_perftest quick_topictest run_header_test 
quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker 
run_federation_tests \
+TESTS += start_broker $(system_tests) python_tests stop_broker 
run_federation_tests run_federation_sys_tests \
   run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
   run_queue_flow_limit_tests
 
@@ -315,6 +315,8 @@ EXTRA_DIST +=                                               
                \
   config.null                                                          \
   ais_check                                                            \
   run_federation_tests                                                 \
+  run_federation_sys_tests                  \
+  run_long_federation_sys_tests             \
   run_cli_tests                                                                
\
   run_acl_tests                                                                
\
   .valgrind.supp                                                       \
@@ -352,6 +354,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* 
 # Not run under valgrind, too slow
 
 LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest 
topic_perftest run_ring_queue_test stop_broker \
+ run_long_federation_sys_tests \
  run_failover_soak reliable_replication_test \
  federated_cluster_test_with_node_failure
 

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Jul 19 19:57:06 2011
@@ -81,13 +81,14 @@ public:
     Message& getMessage() { return *(msg.get()); }
 };
 
-intrusive_ptr<Message> create_message(std::string exchange, std::string 
routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string 
routingKey, uint64_t ttl = 0) {
     intrusive_ptr<Message> msg(new Message());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     msg->getFrames().append(method);
     msg->getFrames().append(header);
     
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+    if (ttl) 
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
     return msg;
 }
 
@@ -437,10 +438,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-       
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-       
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-       
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-       
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+       msg1->insertCustomProperty(key,"a");
+       msg2->insertCustomProperty(key,"b");
+       msg3->insertCustomProperty(key,"c");
+       msg4->insertCustomProperty(key,"a");
 
        //enqueue 4 message
     queue->deliver(msg1);
@@ -462,9 +463,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
     intrusive_ptr<Message> msg5 = create_message("e", "A");
     intrusive_ptr<Message> msg6 = create_message("e", "B");
     intrusive_ptr<Message> msg7 = create_message("e", "C");
-       
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-       
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-       
msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+       msg5->insertCustomProperty(key,"a");
+       msg6->insertCustomProperty(key,"b");
+       msg7->insertCustomProperty(key,"c");
     queue->deliver(msg5);
     queue->deliver(msg6);
     queue->deliver(msg7);
@@ -499,7 +500,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
     queue->deliver(msg1);
     queue->deliver(msg2);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -531,12 +532,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
 
-    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-    
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-    
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-    
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"b");
+    msg3->insertCustomProperty(key,"c");
+    msg4->insertCustomProperty(key,"a");
+    msg5->insertCustomProperty(key,"b");
+    msg6->insertCustomProperty(key,"c");
 
     //enqueue 4 message
     queue->deliver(msg1);
@@ -600,8 +601,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
     args.getLVQKey(key);
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
-    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"a");
 
     queue1->deliver(msg1);
     queue2->deliver(msg1);
@@ -644,8 +645,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
     args.getLVQKey(key);
     BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
-    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-    
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    msg1->insertCustomProperty(key,"a");
+    msg2->insertCustomProperty(key,"a");
        // 3
     queue1->deliver(msg1);
     // 4
@@ -665,12 +666,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
 void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint 
evenTtl = 0)
 {
     for (uint i = 0; i < count; i++) {
-        intrusive_ptr<Message> m = create_message("exchange", "key");
-        if (i % 2) {
-            if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
-        } else {
-            if (evenTtl) 
m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
-        }
+        intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? 
oddTtl : evenTtl);
         m->setTimestamp(new broker::ExpiryPolicy);
         queue.deliver(m);
     }

Modified: qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Tue Jul 19 19:57:06 2011
@@ -50,10 +50,9 @@ struct TxPublishTest
     TxPublishTest() :
         queue1(new Queue("queue1", false, &store, 0)),
         queue2(new Queue("queue2", false, &store, 0)),
-        msg(MessageUtils::createMessage("exchange", "routing_key", false, 
"id")),
+        msg(MessageUtils::createMessage("exchange", "routing_key", true)),
         op(msg)
     {
-        msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
         op.deliverTo(queue1);
         op.deliverTo(queue2);
     }

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Jul 19 19:57:06 2011
@@ -251,7 +251,7 @@ class Broker(Popen):
     def get_log(self):
         return os.path.abspath(self.log)
 
-    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, 
port=0, log_level=None, wait=None):
+    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, 
port=0, log_level=None, wait=None, show_cmd=False):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
 
@@ -280,6 +280,7 @@ class Broker(Popen):
             cmd += ["--log-enable=%s" % log_level]
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
+        if show_cmd: print cmd
         Popen.__init__(self, cmd, expect, stdout=PIPE)
         test.cleanup_stop(self)
         self._host = "127.0.0.1"
@@ -400,7 +401,7 @@ class Cluster:
 
     _cluster_count = 0
 
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, 
wait=True):
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, 
wait=True, show_cmd=False):
         self.test = test
         self._brokers=[]
         self.name = "cluster%d" % Cluster._cluster_count
@@ -411,16 +412,16 @@ class Cluster:
         self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
         assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
         self.args += [ "--load-module", BrokerTest.cluster_lib ]
-        self.start_n(count, expect=expect, wait=wait)
+        self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
 
-    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], 
port=0):
+    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], 
port=0, show_cmd=False):
         """Add a broker to the cluster. Returns the index of the new broker."""
         if not name: name="%s-%d" % (self.name, len(self._brokers))
-        self._brokers.append(self.test.broker(self.args+args, name, expect, 
wait, port=port))
+        self._brokers.append(self.test.broker(self.args+args, name, expect, 
wait, port=port, show_cmd=show_cmd))
         return self._brokers[-1]
 
-    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
-        for i in range(count): self.start(expect=expect, wait=wait, args=args)
+    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], 
show_cmd=False):
+        for i in range(count): self.start(expect=expect, wait=wait, args=args, 
show_cmd=show_cmd)
 
     # Behave like a list of brokers.
     def __len__(self): return len(self._brokers)
@@ -477,18 +478,18 @@ class BrokerTest(TestCase):
         self.cleanup_stop(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, 
port=0, log_level=None):
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, 
port=0, log_level=None, show_cmd=False):
         """Create and return a broker ready for use"""
-        b = Broker(self, args=args, name=name, expect=expect, port=port, 
log_level=log_level)
+        b = Broker(self, args=args, name=name, expect=expect, port=port, 
log_level=log_level, show_cmd=show_cmd)
         if (wait):
             try: b.ready()
             except Exception, e:
                 raise RethrownException("Failed to start broker %s(%s): %s" % 
(b.name, b.log, e))
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, 
show_cmd=False):
         """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect, wait=wait)
+        cluster = Cluster(self, count, args, expect=expect, wait=wait, 
show_cmd=show_cmd)
         return cluster
 
     def browse(self, session, queue, timeout=0):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to