Author: kpvdr
Date: Tue Jul 19 19:57:06 2011
New Revision: 1148503
URL: http://svn.apache.org/viewvc?rev=1148503&view=rev
Log:
QPID-702656 Patch from Gordon Sim plus tests which detect the condition being
solved. Added a make check-long target to the Makefile in the cpp dir to make
it easier to run the long tests.
Added:
qpid/trunk/qpid/cpp/src/tests/federation_sys.py (with props)
qpid/trunk/qpid/cpp/src/tests/run_federation_sys_tests (with props)
qpid/trunk/qpid/cpp/src/tests/run_long_federation_sys_tests
Modified:
qpid/trunk/qpid/cpp/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
qpid/trunk/qpid/cpp/src/tests/brokertest.py
Modified: qpid/trunk/qpid/cpp/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/Makefile.am?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/Makefile.am Tue Jul 19 19:57:06 2011
@@ -33,3 +33,7 @@ SUBDIRS = managementgen etc src docs/api
# Update libtool, if needed.
libtool: $(LIBTOOL_DEPS)
$(SHELL) ./config.status --recheck
+
+check-long: all
+ $(MAKE) -C src/tests check-long
+
\ No newline at end of file
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jul 19 19:57:06 2011
@@ -434,8 +434,9 @@ Manageable::status_t Broker::ManagementM
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
- QPID_LOG (debug, "Broker::connect()");
string transport = hp.i_transport.empty() ? TCP_TRANSPORT :
hp.i_transport;
+ QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port
<< "; transport=" << transport <<
+ "; durable=" << (hp.i_durable?"T":"F") << ";
authMech=\"" << hp.i_authMechanism << "\"");
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
return Manageable::STATUS_NOT_IMPLEMENTED;
@@ -452,9 +453,9 @@ Manageable::status_t Broker::ManagementM
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue,
moveArgs.i_qty))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue,
moveArgs.i_qty))
status = Manageable::STATUS_OK;
- else
+ else
return Manageable::STATUS_PARAMETER_INVALID;
break;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Jul 19 19:57:06
2011
@@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::Fr
{
id = deliveryId;
if (msg.payload->getRedelivered()){
-
msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
+ msg.payload->setRedelivered();
}
msg.payload->adjustTtl();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Jul 19 19:57:06 2011
@@ -58,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable
if (parent->sequence){
parent->sequenceNo++;
-
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+
msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
}
if (parent->ive) {
parent->lastMsg = &( msg.getMessage());
@@ -390,7 +390,7 @@ bool Exchange::MatchQueue::operator()(Ex
}
void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
- msg->getProperties<DeliveryProperties>()->setExchange(getName());
+ msg->setExchange(getName());
}
bool Exchange::routeWithAlternate(Deliverable& msg)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Jul 19 19:57:06 2011
@@ -30,7 +30,9 @@
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include <boost/pointer_cast.hpp>
#include <time.h>
@@ -51,18 +53,9 @@ Message::Message(const framing::Sequence
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(FAR_FUTURE), dequeueCallback(0),
- inCallback(false), requiredCredit(0), isManagementMessage(false)
+ inCallback(false), requiredCredit(0), isManagementMessage(false),
copyHeaderOnWrite(false)
{}
-Message::Message(const Message& original) :
- PersistableMessage(), frames(original.frames), persistenceId(0),
redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(original.expiration), dequeueCallback(0),
- inCallback(false), requiredCredit(0)
-{
- setExpiryPolicy(original.expiryPolicy);
-}
-
Message::~Message() {}
void Message::forcePersistent()
@@ -288,6 +281,9 @@ void Message::sendHeader(framing::FrameH
sys::Mutex::ScopedLock l(lock);
Relay f(out);
frames.map_if(f, TypeFilter<HEADER_BODY>());
+ //as frame (and pointer to body) has now been passed to handler,
+ //subsequent modifications should use a copy
+ copyHeaderOnWrite = true;
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -342,11 +338,30 @@ bool Message::isExcluded(const std::vect
return false;
}
+class CloneHeaderBody
+{
+public:
+ void operator()(AMQFrame& f)
+ {
+ f.cloneBody();
+ }
+};
+
+AMQHeaderBody* Message::getHeaderBody()
+{
+ if (copyHeaderOnWrite) {
+ CloneHeaderBody f;
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+ copyHeaderOnWrite = false;
+ }
+ return frames.getHeaders();
+}
+
void Message::addTraceId(const std::string& id)
{
sys::Mutex::ScopedLock l(lock);
if (isA<MessageTransferBody>()) {
- FieldTable& headers =
getProperties<MessageProperties>()->getApplicationHeaders();
+ FieldTable& headers =
getModifiableProperties<MessageProperties>()->getApplicationHeaders();
std::string trace = headers.getAsString(X_QPID_TRACE);
if (trace.empty()) {
headers.setString(X_QPID_TRACE, id);
@@ -360,7 +375,8 @@ void Message::addTraceId(const std::stri
void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
@@ -382,9 +398,9 @@ void Message::setTimestamp(const boost::
void Message::adjustTtl()
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
if (props->getTtl()) {
- sys::Mutex::ScopedLock l(lock);
if (expiration < FAR_FUTURE) {
sys::AbsTime current(
expiryPolicy ? expiryPolicy->getCurrentTime() :
sys::AbsTime::now());
@@ -395,6 +411,42 @@ void Message::adjustTtl()
}
}
+void Message::setRedelivered()
+{
+ sys::Mutex::ScopedLock l(lock);
+
getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+}
+
+void Message::insertCustomProperty(const std::string& key, int64_t value)
+{
+ sys::Mutex::ScopedLock l(lock);
+
getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+}
+
+void Message::insertCustomProperty(const std::string& key, const std::string&
value)
+{
+ sys::Mutex::ScopedLock l(lock);
+
getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+}
+
+void Message::removeCustomProperty(const std::string& key)
+{
+ sys::Mutex::ScopedLock l(lock);
+
getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+}
+
+void Message::setExchange(const std::string& exchange)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+}
+
+void Message::clearApplicationHeadersFlag()
+{
+ sys::Mutex::ScopedLock l(lock);
+
getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
+}
+
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
}
@@ -442,11 +494,6 @@ uint8_t Message::getPriority() const {
return getAdapter().getPriority(frames);
}
-framing::FieldTable& Message::getOrInsertHeaders()
-{
- return getProperties<MessageProperties>()->getApplicationHeaders();
-}
-
bool Message::getIsManagementMessage() const { return isManagementMessage; }
void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Jul 19 19:57:06 2011
@@ -29,13 +29,17 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include <memory>
#include <string>
#include <vector>
namespace qpid {
namespace framing {
+class AMQBody;
+class AMQHeaderBody;
class FieldTable;
class SequenceNumber;
}
@@ -53,7 +57,6 @@ public:
typedef boost::function<void (const boost::intrusive_ptr<Message>&)>
MessageCallback;
QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id =
framing::SequenceNumber());
- QPID_BROKER_EXTERN Message(const Message&);
QPID_BROKER_EXTERN ~Message();
uint64_t getPersistenceId() const { return persistenceId; }
@@ -75,7 +78,6 @@ public:
bool isImmediate() const;
QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders()
const;
QPID_BROKER_EXTERN std::string getAppId() const;
- framing::FieldTable& getOrInsertHeaders();
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
@@ -85,18 +87,19 @@ public:
sys::AbsTime getExpiration() const { return expiration; }
void setExpiration(sys::AbsTime exp) { expiration = exp; }
void adjustTtl();
+ void setRedelivered();
+ void insertCustomProperty(const std::string& key, int64_t value);
+ void insertCustomProperty(const std::string& key, const std::string&
value);
+ void removeCustomProperty(const std::string& key);
+ void setExchange(const std::string&);
+ void clearApplicationHeadersFlag();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
- template <class T> T* getProperties() {
- qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>(true);
- }
-
template <class T> const T* getProperties() const {
const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>(true);
+ return p->get<T>();
}
template <class T> const T* hasProperties() const {
@@ -156,9 +159,8 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
- void forcePersistent();
- bool isForcedPersistent();
-
+ void forcePersistent();
+ bool isForcedPersistent();
/** Call cb when dequeue is complete, may call immediately. Holds cb by
reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
@@ -178,7 +180,7 @@ public:
bool redelivered;
bool loaded;
bool staged;
- bool forcePersistentPolicy; // used to force message as durable, via a
broker policy
+ bool forcePersistentPolicy; // used to force message as durable, via a
broker policy
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
qpid::sys::AbsTime expiration;
@@ -194,6 +196,15 @@ public:
uint32_t requiredCredit;
bool isManagementMessage;
+ mutable bool copyHeaderOnWrite;
+
+ /**
+ * Expects lock to be held
+ */
+ template <class T> T* getModifiableProperties() {
+ return getHeaderBody()->get<T>(true);
+ }
+ qpid::framing::AMQHeaderBody* getHeaderBody();
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jul 19 19:57:06 2011
@@ -525,7 +525,7 @@ void Queue::push(boost::intrusive_ptr<Me
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey,
sequence);
+ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
@@ -627,11 +627,6 @@ bool Queue::enqueue(TransactionContext*
}
if (traceId.size()) {
- //copy on write: take deep copy of message before modifying it
- //as the frames may already be available for delivery on other
- //threads
- boost::intrusive_ptr<Message> copy(new Message(*msg));
- msg = copy;
msg->addTraceId(traceId);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Jul 19 19:57:06
2011
@@ -318,22 +318,22 @@ class MessageUpdater {
lastPos = message.position;
// if the ttl > 0, we need to send the calculated expiration time to
the updatee
- if (message.payload->getProperties<DeliveryProperties>()->getTtl() >
0) {
+ const DeliveryProperties* dprops =
+ message.payload->getProperties<DeliveryProperties>();
+ if (dprops && dprops->getTtl() > 0) {
bool hadMessageProps =
message.payload->hasProperties<framing::MessageProperties>();
- framing::MessageProperties* mprops =
+ const framing::MessageProperties* mprops =
message.payload->getProperties<framing::MessageProperties>();
bool hadApplicationHeaders = mprops->hasApplicationHeaders();
- FieldTable& applicationHeaders = mprops->getApplicationHeaders();
- applicationHeaders.setInt64(
- UpdateClient::X_QPID_EXPIRATION,
- sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+
message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
+ sys::Duration(sys::EPOCH,
message.payload->getExpiration()));
// If message properties or application headers didn't exist
// prior to us adding data, we want to remove them on the other
side.
if (!hadMessageProps)
-
applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+
message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
else if (!hadApplicationHeaders)
- applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0);
+
message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
}
// We can't send a broker::Message via the normal client API,
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp Tue Jul 19 19:57:06
2011
@@ -49,18 +49,18 @@ void UpdateExchange::setProperties(const
// Copy expiration from x-property if present.
if (msg->hasProperties<MessageProperties>()) {
- MessageProperties* mprops = msg->getProperties<MessageProperties>();
+ const MessageProperties* mprops =
msg->getProperties<MessageProperties>();
if (mprops->hasApplicationHeaders()) {
- FieldTable& headers = mprops->getApplicationHeaders();
+ const FieldTable& headers = mprops->getApplicationHeaders();
if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
msg->setExpiration(
sys::AbsTime(sys::EPOCH,
headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
- headers.erase(UpdateClient::X_QPID_EXPIRATION);
+ msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
// Erase props/headers that were added by the UpdateClient
if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
msg->eraseProperties<MessageProperties>();
else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
- mprops->clearApplicationHeadersFlag();
+ msg->clearApplicationHeadersFlag();
}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Jul 19 19:57:06 2011
@@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer)
return true;
}
+void AMQFrame::cloneBody()
+{
+ body = body->clone();
+}
+
std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
return
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Jul 19 19:57:06 2011
@@ -59,6 +59,11 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame
return boost::polymorphic_downcast<const T*>(getBody());
}
+ /**
+ * Take a deep copy of the body currently referenced
+ */
+ void cloneBody();
+
QPID_COMMON_EXTERN void encode(Buffer& buffer) const;
QPID_COMMON_EXTERN bool decode(Buffer& buffer);
QPID_COMMON_EXTERN uint32_t encodedSize() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Jul 19
19:57:06 2011
@@ -614,7 +614,7 @@ void ManagementAgent::sendBufferLH(const
props->setAppId("qmf2");
for (i = headers.begin(); i != headers.end(); ++i) {
- msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+ msg->insertCustomProperty(i->first, i->second.asString());
}
DeliveryProperties* dp =
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Tue
Jul 19 19:57:06 2011
@@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDe
void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage&
enqueued)
{
boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue),
enqueued.payload));
- FieldTable& headers =
msg->getProperties<MessageProperties>()->getApplicationHeaders();
- headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
- headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
+ msg->insertCustomProperty(REPLICATION_TARGET_QUEUE,
enqueued.queue->getName());
+ msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE);
+ msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position);
route(msg);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Tue Jul 19
19:57:06 2011
@@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueE
} else {
queue->setPosition(seqno1);
- FieldTable& headers =
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
- headers.erase(REPLICATION_TARGET_QUEUE);
- headers.erase(REPLICATION_EVENT_SEQNO);
- headers.erase(REPLICATION_EVENT_TYPE);
- headers.erase(QUEUE_MESSAGE_POSITION);
+ msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
+ msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
+ msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
+ msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
msg.deliverTo(queue);
QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
if (mgmtExchange != 0) {
Modified: qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Tue Jul 19 19:57:06 2011
@@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption)
TopicExchange topic ("topic1", false, args);
intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
-
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a",
"abc");
+ msg1->insertCustomProperty("a", "abc");
DeliverableMessage dmsg1(msg1);
FieldTable args2;
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jul 19 19:57:06 2011
@@ -298,7 +298,7 @@ TESTS_ENVIRONMENT = \
$(srcdir)/run_test
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test
quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker
run_federation_tests \
+TESTS += start_broker $(system_tests) python_tests stop_broker
run_federation_tests run_federation_sys_tests \
run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
run_queue_flow_limit_tests
@@ -315,6 +315,8 @@ EXTRA_DIST +=
\
config.null \
ais_check \
run_federation_tests \
+ run_federation_sys_tests \
+ run_long_federation_sys_tests \
run_cli_tests
\
run_acl_tests
\
.valgrind.supp \
@@ -352,6 +354,7 @@ CLEANFILES+=valgrind.out *.log *.vglog*
# Not run under valgrind, too slow
LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest
topic_perftest run_ring_queue_test stop_broker \
+ run_long_federation_sys_tests \
run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Jul 19 19:57:06 2011
@@ -81,13 +81,14 @@ public:
Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> create_message(std::string exchange, std::string
routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string
routingKey, uint64_t ttl = 0) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
msg->getFrames().append(method);
msg->getFrames().append(header);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ if (ttl)
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
return msg;
}
@@ -437,10 +438,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"b");
+ msg3->insertCustomProperty(key,"c");
+ msg4->insertCustomProperty(key,"a");
//enqueue 4 message
queue->deliver(msg1);
@@ -462,9 +463,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
intrusive_ptr<Message> msg5 = create_message("e", "A");
intrusive_ptr<Message> msg6 = create_message("e", "B");
intrusive_ptr<Message> msg7 = create_message("e", "C");
-
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-
msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg5->insertCustomProperty(key,"a");
+ msg6->insertCustomProperty(key,"b");
+ msg7->insertCustomProperty(key,"c");
queue->deliver(msg5);
queue->deliver(msg6);
queue->deliver(msg7);
@@ -499,7 +500,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
queue->deliver(msg1);
queue->deliver(msg2);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -531,12 +532,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
-
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
-
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"b");
+ msg3->insertCustomProperty(key,"c");
+ msg4->insertCustomProperty(key,"a");
+ msg5->insertCustomProperty(key,"b");
+ msg6->insertCustomProperty(key,"c");
//enqueue 4 message
queue->deliver(msg1);
@@ -600,8 +601,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"a");
queue1->deliver(msg1);
queue2->deliver(msg1);
@@ -644,8 +645,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
-
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"a");
// 3
queue1->deliver(msg1);
// 4
@@ -665,12 +666,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint
evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = create_message("exchange", "key");
- if (i % 2) {
- if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
- } else {
- if (evenTtl)
m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
- }
+ intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ?
oddTtl : evenTtl);
m->setTimestamp(new broker::ExpiryPolicy);
queue.deliver(m);
}
Modified: qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Tue Jul 19 19:57:06 2011
@@ -50,10 +50,9 @@ struct TxPublishTest
TxPublishTest() :
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(MessageUtils::createMessage("exchange", "routing_key", false,
"id")),
+ msg(MessageUtils::createMessage("exchange", "routing_key", true)),
op(msg)
{
- msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
}
Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1148503&r1=1148502&r2=1148503&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Jul 19 19:57:06 2011
@@ -251,7 +251,7 @@ class Broker(Popen):
def get_log(self):
return os.path.abspath(self.log)
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING,
port=0, log_level=None, wait=None):
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING,
port=0, log_level=None, wait=None, show_cmd=False):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -280,6 +280,7 @@ class Broker(Popen):
cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
+ if show_cmd: print cmd
Popen.__init__(self, cmd, expect, stdout=PIPE)
test.cleanup_stop(self)
self._host = "127.0.0.1"
@@ -400,7 +401,7 @@ class Cluster:
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING,
wait=True):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING,
wait=True, show_cmd=False):
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
@@ -411,16 +412,16 @@ class Cluster:
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait)
+ self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[],
port=0):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[],
port=0, show_cmd=False):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect,
wait, port=port))
+ self._brokers.append(self.test.broker(self.args+args, name, expect,
wait, port=port, show_cmd=show_cmd))
return self._brokers[-1]
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
- for i in range(count): self.start(expect=expect, wait=wait, args=args)
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[],
show_cmd=False):
+ for i in range(count): self.start(expect=expect, wait=wait, args=args,
show_cmd=show_cmd)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -477,18 +478,18 @@ class BrokerTest(TestCase):
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True,
port=0, log_level=None):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True,
port=0, log_level=None, show_cmd=False):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port,
log_level=log_level)
+ b = Broker(self, args=args, name=name, expect=expect, port=port,
log_level=log_level, show_cmd=show_cmd)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" %
(b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True,
show_cmd=False):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait)
+ cluster = Cluster(self, count, args, expect=expect, wait=wait,
show_cmd=show_cmd)
return cluster
def browse(self, session, queue, timeout=0):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]