Author: aconway
Date: Fri Dec 11 20:55:45 2009
New Revision: 889813
URL: http://svn.apache.org/viewvc?rev=889813&view=rev
Log:
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store
have the same headers and content-size for an updatee or a broker that
receives the publish directly.
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
- copied, changed from r889736,
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
Modified:
qpid/trunk/qpid/cpp/src/cluster.cmake
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/cpp/src/tests/test_store.cpp
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/cluster.cmake
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.cmake?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.cmake (original)
+++ qpid/trunk/qpid/cpp/src/cluster.cmake Fri Dec 11 20:55:45 2009
@@ -109,6 +109,7 @@
qpid/cluster/ExpiryPolicy.cpp
qpid/cluster/FailoverExchange.cpp
qpid/cluster/FailoverExchange.h
+ qpid/cluster/UpdateExchange.cpp
qpid/cluster/UpdateExchange.h
qpid/cluster/UpdateReceiver.h
qpid/cluster/LockedConnectionMap.h
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Fri Dec 11 20:55:45 2009
@@ -69,6 +69,7 @@
qpid/cluster/FailoverExchange.cpp \
qpid/cluster/FailoverExchange.h \
qpid/cluster/UpdateExchange.h \
+ qpid/cluster/UpdateExchange.cpp \
qpid/cluster/UpdateReceiver.h \
qpid/cluster/LockedConnectionMap.h \
qpid/cluster/Multicaster.cpp \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Dec 11 20:55:45 2009
@@ -154,6 +154,7 @@
queueCleaner(queues, timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
+ clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Dec 11 20:55:45 2009
@@ -168,8 +168,10 @@
std::vector<Url> getKnownBrokersImpl();
std::string federationTag;
bool recovery;
+ bool clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
+
public:
virtual ~Broker();
@@ -259,6 +261,9 @@
void setRecovery(bool set) { recovery = set; }
bool getRecovery() const { return recovery; }
+ void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ bool isClusterUpdatee() const { return clusterUpdatee; }
+
management::ManagementAgent* getManagementAgent() { return
managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Dec 11 20:55:45 2009
@@ -425,19 +425,4 @@
return getProperties<MessageProperties>()->getApplicationHeaders();
}
-
-void Message::setUpdateDestination(const std::string& d)
-{
- updateDestination = d;
-}
-
-
-bool Message::isUpdateMessage()
-{
- return updateDestination.size() && isA<MessageTransferBody>()
- && getMethod<MessageTransferBody>()->getDestination() ==
updateDestination;
-}
-
-std::string Message::updateDestination;
-
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Dec 11 20:55:45 2009
@@ -104,6 +104,10 @@
return frames.as<T>();
}
+ template <class T> T* getMethod() {
+ return frames.as<T>();
+ }
+
template <class T> bool isA() const {
return frames.isA<T>();
}
@@ -157,9 +161,6 @@
void setDequeueCompleteCallback(MessageCallback& cb);
void resetDequeueCompleteCallback();
- bool isUpdateMessage();
- static void setUpdateDestination(const std::string&);
-
private:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
@@ -190,7 +191,6 @@
MessageCallback* dequeueCallback;
uint32_t requiredCredit;
- static std::string updateDestination;
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Dec 11 20:55:45 2009
@@ -598,7 +598,7 @@
string key = ft->getAsString(qpidVQMatchProperty);
i = lvq.find(key);
- if (i == lvq.end() || msg->isUpdateMessage()){
+ if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
messages.push_back(qm);
listeners.populate(copy);
lvq[key] = msg;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Dec 11 20:55:45 2009
@@ -619,6 +619,7 @@
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
+ broker.setClusterUpdatee(true);
state = JOINER;
}
else { // I can go ready.
@@ -813,6 +814,7 @@
memberUpdate(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()),
self);
state = CATCHUP;
+ broker.setClusterUpdatee(false);
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
deliverEventQueue.start();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Dec 11 20:55:45
2009
@@ -139,7 +139,6 @@
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(),
*cluster)));
- broker::Message::setUpdateDestination(UpdateClient::UPDATE);
ManagementAgent* mgmt = broker->getManagementAgent();
if (mgmt) {
std::auto_ptr<IdAllocator> allocator(new
UpdateClientIdAllocator());
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Dec 11 20:55:45
2009
@@ -217,10 +217,11 @@
// Disable client code that clears the delivery-properties.exchange
sb.get()->setDoClearDeliveryPropertiesExchange(false);
framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), UpdateClient::UPDATE,
message::ACCEPT_MODE_NONE,
- message::ACQUIRE_MODE_PRE_ACQUIRED);
+ *message.payload->getFrames().as<framing::MessageTransferBody>());
+ transfer.setDestination(UpdateClient::UPDATE);
- sb.get()->send(transfer, message.payload->getFrames(),
!message.payload->isContentReleased());
+ sb.get()->send(transfer, message.payload->getFrames(),
+ !message.payload->isContentReleased());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize =
sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp (from r889736,
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h&r1=889736&r2=889813&rev=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp Fri Dec 11 20:55:45
2009
@@ -1,6 +1,3 @@
-#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H
-#define QPID_CLUSTER_UPDATEEXCHANGE_H
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,25 +18,30 @@
* under the License.
*
*/
-
-#include "qpid/cluster/UpdateClient.h"
-#include "qpid/broker/FanOutExchange.h"
-
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/broker/Message.h"
+#include "UpdateExchange.h"
namespace qpid {
namespace cluster {
-/**
- * A keyless exchange (like fanout exchange) that does not modify
delivery-properties.exchange
- * on messages.
- */
-class UpdateExchange : public broker::FanOutExchange
-{
- public:
- UpdateExchange(management::Manageable* parent) :
broker::Exchange(UpdateClient::UPDATE, parent),
broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
- void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
-};
+using framing::MessageTransferBody;
+using framing::DeliveryProperties;
-}} // namespace qpid::cluster
+UpdateExchange::UpdateExchange(management::Manageable* parent)
+ : broker::Exchange(UpdateClient::UPDATE, parent),
+ broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
+
+
+void UpdateExchange::setProperties(const
boost::intrusive_ptr<broker::Message>& msg) {
+ MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
+ assert(transfer);
+ const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
+ assert(props);
+ if (props->hasExchange())
+ transfer->setDestination(props->getExchange());
+ else
+ transfer->clearDestinationFlag();
+}
-#endif /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/
+}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h Fri Dec 11 20:55:45
2009
@@ -30,14 +30,14 @@
namespace cluster {
/**
- * A keyless exchange (like fanout exchange) that does not modify
delivery-properties.exchange
- * on messages.
+ * A keyless exchange (like fanout exchange) that does not modify
+ * delivery-properties.exchange but copies it to the MessageTransfer.
*/
class UpdateExchange : public broker::FanOutExchange
{
public:
- UpdateExchange(management::Manageable* parent) :
broker::Exchange(UpdateClient::UPDATE, parent),
broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
- void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
+ UpdateExchange(management::Manageable* parent);
+ void setProperties(const boost::intrusive_ptr<broker::Message>&);
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Fri Dec 11 20:55:45 2009
@@ -52,6 +52,11 @@
return parts.empty() ? 0 : parts[0].getMethod();
}
+AMQMethodBody* FrameSet::getMethod()
+{
+ return parts.empty() ? 0 : parts[0].getMethod();
+}
+
const AMQHeaderBody* FrameSet::getHeaders() const
{
return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Fri Dec 11 20:55:45 2009
@@ -57,6 +57,7 @@
bool isContentBearing() const;
QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const;
+ QPID_COMMON_EXTERN AMQMethodBody* getMethod();
QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const;
QPID_COMMON_EXTERN AMQHeaderBody* getHeaders();
@@ -70,6 +71,11 @@
return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) :
0;
}
+ template <class T> T* as() {
+ AMQMethodBody* method = getMethod();
+ return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0;
+ }
+
template <class T> const T* getHeaderProperties() const {
const AMQHeaderBody* header = getHeaders();
return header ? header->get<T>() : 0;
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Dec 11 20:55:45 2009
@@ -57,6 +57,34 @@
self.assertEqual("y", m.content)
s2.connection.close()
+ def test_store_direct_update_match(self):
+ """Verify that brokers stores an identical message whether they
receive it
+ direct from clients or during an update, no header or other
differences"""
+ cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
+ cluster.start(args=["--test-store-dump", "direct.dump"])
+ # Try messages with various headers
+ cluster[0].send_message("q", Message(durable=True, content="foobar",
+ subject="subject",
+ reply_to="reply_to",
+ properties={"n":10}))
+ # Try messages of different sizes
+ for size in range(0,10000,100):
+ cluster[0].send_message("q", Message(content="x"*size,
durable=True))
+ # Try sending via named exchange
+ c = cluster[0].connect_old()
+ s = c.session(str(qpid.datatypes.uuid4()))
+ s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
+ props = s.delivery_properties(routing_key="foo", delivery_mode=2)
+ s.message_transfer(
+ destination="amq.direct",
+ message=qpid.datatypes.Message(props, "content"))
+
+ # Now update a new member and compare their dumps.
+ cluster.start(args=["--test-store-dump", "updatee.dump"])
+ assert file("direct.dump").read() == file("updatee.dump").read()
+ os.remove("direct.dump")
+ os.remove("updatee.dump")
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
Modified: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Fri Dec 11 20:55:45 2009
@@ -34,11 +34,14 @@
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/Broker.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include <boost/cast.hpp>
#include <boost/lexical_cast.hpp>
+#include <memory>
+#include <fstream>
using namespace qpid;
using namespace broker;
@@ -51,10 +54,13 @@
struct TestStoreOptions : public Options {
string name;
+ string dump;
TestStoreOptions() : Options("Test Store Options") {
addOptions()
- ("test-store-name", optValue(name, "NAME"), "Name to identify test
store instance.");
+ ("test-store-name", optValue(name, "NAME"), "Name of test store
instance.")
+ ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued
messages.")
+ ;
}
};
@@ -71,19 +77,38 @@
class TestStore : public NullMessageStore {
public:
- TestStore(const string& name_, Broker& broker_) : name(name_),
broker(broker_) {}
+ TestStore(const TestStoreOptions& opts, Broker& broker_)
+ : options(opts), name(opts.name), broker(broker_)
+ {
+ QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
+ if (!options.dump.empty())
+ dump.reset(new ofstream(options.dump.c_str()));
+ }
~TestStore() {
for_each(threads.begin(), threads.end(), boost::bind(&Thread::join,
_1));
}
+ virtual bool isNull() const { return false; }
+
void enqueue(TransactionContext* ,
- const boost::intrusive_ptr<PersistableMessage>& msg,
+ const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& )
{
- string data =
boost::polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+ Message* msg = dynamic_cast<Message*>(pmsg.get());
+ assert(msg);
+
+ // Dump the message if there is a dump file.
+ if (dump.get()) {
+ msg->getFrames().getMethod()->print(*dump);
+ *dump << endl << " ";
+ msg->getFrames().getHeaders()->print(*dump);
+ *dump << endl << " ";
+ *dump << msg->getFrames().getContentSize() << endl;
+ }
// Check the message for special instructions.
+ string data = msg->getFrames().getContent();
size_t i = string::npos;
size_t j = string::npos;
if (strncmp(data.c_str(), TEST_STORE_DO.c_str(),
strlen(TEST_STORE_DO.c_str())) == 0
@@ -119,9 +144,11 @@
private:
static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+ TestStoreOptions options;
string name;
Broker& broker;
vector<Thread> threads;
+ std::auto_ptr<ofstream> dump;
};
const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
@@ -139,7 +166,7 @@
{
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- boost::shared_ptr<MessageStore> p(new TestStore(options.name,
*broker));
+ boost::shared_ptr<MessageStore> p(new TestStore(options, *broker));
broker->setStore (p);
}
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Dec 11 20:55:45 2009
@@ -259,15 +259,15 @@
self.args += [ "--load-module", BrokerTest.cluster_lib ]
self.start_n(count, expect=expect, wait=wait)
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[]):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
log.debug("Cluster %s starting member %s" % (self.name, name))
- self._brokers.append(self.test.broker(self.args, name, expect, wait))
+ self._brokers.append(self.test.broker(self.args+args, name, expect,
wait))
return self._brokers[-1]
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True):
- for i in range(count): self.start(expect=expect, wait=wait)
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
+ for i in range(count): self.start(expect=expect, wait=wait, args=args)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -289,6 +289,7 @@
receiver_exec = os.getenv("RECEIVER_EXEC")
sender_exec = os.getenv("SENDER_EXEC")
store_lib = os.getenv("STORE_LIB")
+ test_store_lib = os.getenv("TEST_STORE_LIB")
rootdir = os.getcwd()
def configure(self, config): self.config=config
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]