Author: aconway
Date: Thu Jan 19 23:02:12 2012
New Revision: 1233634
URL: http://svn.apache.org/viewvc?rev=1233634&view=rev
Log:
QPID-3603: Automatic wiring replication for HA.
Automatic replication of queues an exchanges. Bidnings TBD.
Get rid of long delay establishing connections:
- broker/Connection.cpp: requestIOProcessing() called before connection open
saves work till connection is open.
- broker/Link.cpp,LinkRegistry: Fix some const correctness errors.
Added:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (with props)
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
- copied, changed from r1233633,
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (with props)
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/Connection.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/test_env.sh.in
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk Thu Jan 19 23:02:12 2012
@@ -26,6 +26,8 @@ ha_la_SOURCES = \
qpid/ha/HaPlugin.cpp \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
+ qpid/ha/Backup.cpp \
+ qpid/ha/Backup.h \
qpid/ha/Settings.h
ha_la_LIBADD = libqpidbroker.la
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Connection.cpp Thu Jan
19 23:02:12 2012
@@ -130,7 +130,7 @@ void Connection::requestIOProcessing(boo
{
ScopedLock<Mutex> l(ioCallbackLock);
ioCallbacks.push(callback);
- out.activateOutput();
+ if (isOpen()) out.activateOutput();
}
Connection::~Connection()
@@ -156,11 +156,14 @@ Connection::~Connection()
void Connection::received(framing::AMQFrame& frame) {
// Received frame on connection so delay timeout
restartTimeout();
+ bool wasOpen = isOpen();
adapter.handle(frame);
if (link) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
+ if (!wasOpen && isOpen())
+ doIoCallbacks(); // Do any callbacks registered before we opened.
}
void Connection::sent(const framing::AMQFrame& frame)
@@ -329,17 +332,16 @@ void Connection::closed(){ // Physically
}
void Connection::doIoCallbacks() {
- {
- ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context,
they are
- // not cluster safe because they are queued for execution in non-IO
threads.
- ClusterUnsafeScope cus;
- while (!ioCallbacks.empty()) {
- boost::function0<void> cb = ioCallbacks.front();
- ioCallbacks.pop();
- ScopedUnlock<Mutex> ul(ioCallbackLock);
- cb(); // Lend the IO thread for management processing
- }
+ if (!isOpen()) return; // Don't process IO callbacks until we are open.
+ ScopedLock<Mutex> l(ioCallbackLock);
+ // Although IO callbacks execute in the connection thread context, they are
+ // not cluster safe because they are queued for execution in non-IO
threads.
+ ClusterUnsafeScope cus;
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
}
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp Thu Jan 19
23:02:12 2012
@@ -47,13 +47,13 @@ namespace _qmf = qmf::org::apache::qpid:
Link::Link(LinkRegistry* _links,
MessageStore* _store,
- string& _host,
+ const string& _host,
uint16_t _port,
- string& _transport,
+ const string& _transport,
bool _durable,
- string& _authMechanism,
- string& _username,
- string& _password,
+ const string& _authMechanism,
+ const string& _username,
+ const string& _password,
Broker* _broker,
Manageable* parent)
: links(_links), store(_store), host(_host), port(_port),
@@ -79,6 +79,7 @@ Link::Link(LinkRegistry* _links,
}
}
setStateLH(STATE_WAITING);
+ startConnectionLH();
}
Link::~Link ()
@@ -213,28 +214,30 @@ void Link::add(Bridge::shared_ptr bridge
{
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
+ if (connection)
+ connection->requestIOProcessing
(boost::bind(&Link::ioThreadProcessing, this));
+
}
void Link::cancel(Bridge::shared_ptr bridge)
{
- {
- Mutex::ScopedLock mutex(lock);
+ Mutex::ScopedLock mutex(lock);
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
- }
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
}
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- cancellations.push_back(bridge);
- bridge->closed();
- active.erase(i);
- break;
- }
+ }
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
}
}
+
if (!cancellations.empty()) {
connection->requestIOProcessing
(boost::bind(&Link::ioThreadProcessing, this));
}
@@ -284,6 +287,8 @@ void Link::setConnection(Connection* c)
Mutex::ScopedLock mutex(lock);
connection = c;
updateUrls = true;
+ // Process any IO tasks bridges added before setConnection.
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing,
this));
}
void Link::maintenanceVisit ()
@@ -313,7 +318,7 @@ void Link::maintenanceVisit ()
}
else if (state == STATE_OPERATIONAL && (!active.empty() ||
!created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing
(boost::bind(&Link::ioThreadProcessing, this));
-}
+ }
void Link::reconnect(const qpid::Address& a)
{
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h Thu Jan 19
23:02:12 2012
@@ -92,13 +92,13 @@ namespace qpid {
Link(LinkRegistry* links,
MessageStore* store,
- std::string& host,
+ const std::string& host,
uint16_t port,
- std::string& transport,
+ const std::string& transport,
bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password,
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password,
Broker* broker,
management::Manageable* parent = 0);
virtual ~Link();
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Thu Jan
19 23:02:12 2012
@@ -124,13 +124,13 @@ bool LinkRegistry::updateAddress(const s
}
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
uint16_t port,
- string& transport,
+ const string& transport,
bool durable,
- string& authMechanism,
- string& username,
- string& password)
+ const string&
authMechanism,
+ const string& username,
+ const string& password)
{
Mutex::ScopedLock locker(lock);
@@ -151,16 +151,16 @@ pair<Link::shared_ptr, bool> LinkRegistr
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& tag,
- std::string& excludes,
+ const std::string& tag,
+ const std::string&
excludes,
bool dynamic,
uint16_t sync)
{
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/LinkRegistry.h Thu Jan
19 23:02:12 2012
@@ -84,24 +84,24 @@ namespace broker {
~LinkRegistry();
std::pair<boost::shared_ptr<Link>, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
- std::string& transport,
+ const std::string& transport,
bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password);
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password);
std::pair<Bridge::shared_ptr, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& id,
- std::string& excludes,
+ const std::string& id,
+ const std::string& excludes,
bool dynamic,
uint16_t sync);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.cpp Thu Jan 19
23:02:12 2012
@@ -24,15 +24,19 @@
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
-using qmf::org::apache::qpid::broker::EventQueueDeclare;
-using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventBind;
using qmf::org::apache::qpid::broker::EventExchangeDeclare;
using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
namespace qpid {
namespace broker {
@@ -55,27 +59,47 @@ NodeClone::NodeClone(const std::string&
NodeClone::~NodeClone() {}
+namespace {
+const std::string QPID_REPLICATE("qpid.replicate");
+const std::string ALL("all");
+const std::string WIRING("wiring");
+
+bool isReplicated(const std::string& value) {
+ return value == ALL || value == WIRING;
+}
+bool isReplicated(const framing::FieldTable& f) {
+ return f.isSet(QPID_REPLICATE) &&
isReplicated(f.getAsString(QPID_REPLICATE));
+}
+bool isReplicated(const types::Variant::Map& m) {
+ types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ return i != m.end() && isReplicated(i->second.asString());
+}
+
+}
+
void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const
qpid::framing::FieldTable* headers)
{
if (isQMFv2(msg.getMessage()) && headers) {
- if (headers->getAsString("qmf.content") == "_event") {
- //decode as list
+ if (headers->getAsString("qmf.content") == "_event") { //decode as list
std::string content = msg.getMessage().getFrames().getContent();
qpid::types::Variant::List list;
qpid::amqp_0_10::ListCodec::decode(content, list);
if (list.empty()) {
- QPID_LOG(error, "Error parsing QMF event, expected non-empty
list");
+ QPID_LOG(error, "Error parsing QMF event, empty list");
} else {
try {
+ // FIXME aconway 2011-11-18: should be iterating list?
qpid::types::Variant::Map& map = list.front().asMap();
qpid::types::Variant::Map& schema =
map["_schema_id"].asMap();
qpid::types::Variant::Map& values = map["_values"].asMap();
if (match<EventQueueDeclare>(schema)) {
- if (values["disp"] == "created" &&
values["args"].asMap()["qpid.propagate"]) {
+ std::string name = values["qName"].asString();
+ if (values["disp"] == "created" &&
isReplicated(values["args"].asMap())) {
+ QPID_LOG(debug, "Creating replicated queue " <<
name);
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(values["args"].asMap(),
args);
if (!broker.createQueue(
- values["qName"].asString(),
+ name,
values["durable"].asBool(),
values["autoDel"].asBool(),
0 /*i.e. no owner regardless of
exclusivity on master*/,
@@ -83,61 +107,60 @@ void NodeClone::route(Deliverable& msg,
args,
values["user"].asString(),
values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " <<
values["qName"] << " already exists");
+ QPID_LOG(warning, "Replicated queue " << name
<< " already exists");
}
}
} else if (match<EventQueueDelete>(schema)) {
std::string name = values["qName"].asString();
- QPID_LOG(debug, "Notified of deletion of queue " <<
name);
boost::shared_ptr<Queue> queue =
broker.getQueues().find(name);
- if (queue &&
queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
+ if (queue && isReplicated(queue->getSettings())) {
+ QPID_LOG(debug, "Deleting replicated queue " <<
name);
broker.deleteQueue(
name,
values["user"].asString(),
values["rhost"].asString());
- } else {
- if (queue) {
- QPID_LOG(debug, "Ignoring deletion
notification for non-propagated queue " << name);
- } else {
- QPID_LOG(debug, "No such queue " << name);
- }
}
} else if (match<EventExchangeDeclare>(schema)) {
- if (values["disp"] == "created" &&
values["args"].asMap()["qpid.propagate"]) {
+ if (values["disp"] == "created" &&
isReplicated(values["args"].asMap())) {
+ std::string name = values["exName"].asString();
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(values["args"].asMap(),
args);
+ QPID_LOG(debug, "Creating replicated exchange " <<
name);
if (!broker.createExchange(
- values["exName"].asString(),
+ name,
values["exType"].asString(),
values["durable"].asBool(),
values["altEx"].asString(),
args,
values["user"].asString(),
values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " <<
values["qName"] << " already exists");
+ QPID_LOG(warning, "Replicated exchange " <<
name << " already exists");
}
}
} else if (match<EventExchangeDelete>(schema)) {
std::string name = values["exName"].asString();
- QPID_LOG(debug, "Notified of deletion of exchange " <<
name);
try {
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().get(name);
- if (exchange &&
exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
+ if (exchange && isReplicated(exchange->getArgs()))
{
+ QPID_LOG(warning, "Deleting replicated
exchange " << name);
broker.deleteExchange(
name,
values["user"].asString(),
values["rhost"].asString());
- } else {
- if (exchange) {
- QPID_LOG(debug, "Ignoring deletion
notification for non-propagated exchange " << name);
- } else {
- QPID_LOG(debug, "No such exchange " <<
name);
- }
}
} catch (const qpid::framing::NotFoundException&) {}
}
+ else if (match<EventBind>(schema)) {
+ QPID_LOG(error, "FIXME NodeClone: Not yet implemented
- replicate bindings.");
+ }
+ else if (match<EventSubscribe>(schema)) {
+ // Deliberately ignore.
+ }
+ else {
+ QPID_LOG(warning, "Replicator received unexpected
event, schema=" << schema);
+ }
} catch (const std::exception& e) {
- QPID_LOG(error, "Error propagating configuration: " <<
e.what());
+ QPID_LOG(error, "Error replicating configuration: " <<
e.what());
}
}
} else if (headers->getAsString("qmf.opcode") == "_query_response") {
@@ -145,15 +168,14 @@ void NodeClone::route(Deliverable& msg,
std::string content = msg.getMessage().getFrames().getContent();
qpid::types::Variant::List list;
qpid::amqp_0_10::ListCodec::decode(content, list);
- QPID_LOG(debug, "Got query response (" << list.size() << ")");
for (qpid::types::Variant::List::iterator i = list.begin(); i !=
list.end(); ++i) {
std::string type =
i->asMap()["_schema_id"].asMap()["_class_name"];
qpid::types::Variant::Map& values =
i->asMap()["_values"].asMap();
- QPID_LOG(debug, "class: " << type << ", values: " << values);
- if (values["arguments"].asMap()["qpid.propagate"]) {
+ if (isReplicated(values["arguments"].asMap())) {
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(values["arguments"].asMap(),
args);
if (type == "queue") {
+ QPID_LOG(debug, "Creating replicated queue " <<
values["name"].asString() << " (in catch-up)");
if (!broker.createQueue(
values["name"].asString(),
values["durable"].asBool(),
@@ -163,9 +185,10 @@ void NodeClone::route(Deliverable& msg,
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection
id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " <<
values["name"] << " already exists");
+ QPID_LOG(warning, "Replicated queue " <<
values["name"] << " already exists (in catch-up)");
}
} else if (type == "exchange") {
+ QPID_LOG(debug, "Creating replicated exchange " <<
values["name"].asString() << " (in catch-up)");
if (!broker.createExchange(
values["name"].asString(),
values["type"].asString(),
@@ -174,18 +197,18 @@ void NodeClone::route(Deliverable& msg,
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection
id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " <<
values["qName"] << " already exists");
+ QPID_LOG(warning, "Replicated exchange " <<
values["qName"] << " already exists (in catch-up)");
}
} else {
- QPID_LOG(warning, "Ignoring unknow object class: " <<
type);
+ QPID_LOG(warning, "Replicator ignoring unexpected
class: " << type);
}
}
}
} else {
- QPID_LOG(debug, "Dropping QMFv2 message with headers: " <<
*headers);
+ QPID_LOG(warning, "Replicator ignoring QMFv2 message with headers:
" << *headers);
}
} else {
- QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event
or query response");
+ QPID_LOG(warning, "Replicator ignoring message which is not a QMFv2
event or query response");
}
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/NodeClone.h Thu Jan 19
23:02:12 2012
@@ -23,6 +23,8 @@
*/
#include "qpid/broker/Exchange.h"
+// FIXME aconway 2011-11-17: relocate to ../ha
+
namespace qpid {
namespace broker {
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/Connection.h?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/Connection.h Thu Jan 19
23:02:12 2012
@@ -209,6 +209,8 @@ class Connection :
void queueDequeueSincePurgeState(const std::string&, uint32_t);
+ bool isAnnounced() const { return announced; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
Thu Jan 19 23:02:12 2012
@@ -97,7 +97,7 @@ void OutputInterceptor::deliverDoOutput(
}
void OutputInterceptor::sendDoOutput(size_t newLimit, const
sys::Mutex::ScopedLock&) {
- if (parent.isLocal() && !sentDoOutput && !closing) {
+ if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced())
{
sentDoOutput = true;
parent.getCluster().getMulticast().mcastControl(
ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233634&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:02:12 2012
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "Settings.h"
+#include "qpid/Url.h"
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace ha {
+
+Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+ // Create a link to replicate wiring
+ if (s.brokerUrl != "dummy") {
+ Url url(s.brokerUrl);
+ QPID_LOG(info, "HA backup broker connecting to: " << url);
+
+ std::string protocol = url[0].protocol.empty() ? "tcp" :
url[0].protocol;
+ broker.getLinks().declare( // Declare the link
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ s.mechanism, s.username, s.password);
+
+ broker.getLinks().declare( // Declare the bridge
+ url[0].host, url[0].port,
+ false, // durable
+ "qpid.node-cloner", // src
+ "qpid.node-cloner", // dest
+ "x", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0); // sync?
+ }
+ // FIXME aconway 2011-11-17: need to enhance the link code to
+ // handle discovery of the primary broker and fail-over correctly.
+}
+
+}} // namespace qpid::ha
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h (from r1233633,
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h)
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h?p2=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h&p1=qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h&r1=1233633&r2=1233634&rev=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h Thu Jan 19 23:02:12
2012
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_SETTINGS_H
-#define QPID_HA_SETTINGS_H
+#ifndef QPID_HA_BACKUP_H
+#define QPID_HA_BACKUP_H
/*
*
@@ -22,27 +22,29 @@
*
*/
-#include <string>
+#include "Settings.h"
+#include "qpid/Url.h"
namespace qpid {
-namespace ha {
+namespace broker {
+class Broker;
+}
-using std::string;
+namespace ha {
+class Settings;
/**
- * Configurable settings for HA.
+ * State associated with a backup broker. Manages connections to primary.
*/
-class Settings
+class Backup
{
public:
- Settings() : enabled(false) {}
- bool enabled;
- string status; // primary, backup, solo
- string clientUrl;
- string brokerUrl;
- string username, password, mechanism;
+ Backup(broker::Broker&, const Settings&);
+
private:
+ broker::Broker& broker;
+ Settings settings;
};
}} // namespace qpid::ha
-#endif /*!QPID_HA_SETTINGS_H*/
+#endif /*!QPID_HA_BACKUP_H*/
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jan 19
23:02:12 2012
@@ -18,10 +18,14 @@
* under the License.
*
*/
+#include "Backup.h"
#include "HaBroker.h"
+#include "Settings.h"
+#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace ha {
@@ -30,7 +34,22 @@ namespace _qmf = ::qmf::org::apache::qpi
using namespace management;
using namespace std;
-HaBroker::HaBroker(broker::Broker& b) : broker(b), mgmtObject(0) {
+namespace {
+Url url(const std::string& s, const std::string& id) {
+ try {
+ return Url(s);
+ } catch (const std::exception& e) {
+ throw Exception(Msg() << "Invalid URL for " << id << ": '" << s <<
"'");
+ }
+}
+} // namespace
+
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+ : broker(b),
+ clientUrl(url(s.clientUrl, "ha-client-url")),
+ brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+ mgmtObject(0)
+{
ManagementAgent* ma = broker.getManagementAgent();
if (ma) {
_qmf::Package packageInit(ma);
@@ -39,8 +58,13 @@ HaBroker::HaBroker(broker::Broker& b) :
mgmtObject->set_status("solo");
ma->addObject(mgmtObject);
}
+ QPID_LOG(notice, "HA broker initialized, client-url=" << clientUrl
+ << ", broker-url=" << brokerUrl);
+ backup.reset(new Backup(broker, s));
}
+HaBroker::~HaBroker() {}
+
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args&
args, string&) {
switch (methodId) {
case _qmf::HaBroker::METHOD_SETSTATUS: {
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h Thu Jan 19
23:02:12 2012
@@ -22,9 +22,9 @@
*
*/
+#include "qpid/Url.h"
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
-
#include "qpid/management/Manageable.h"
namespace qpid {
@@ -32,6 +32,8 @@ namespace broker {
class Broker;
}
namespace ha {
+class Settings;
+class Backup;
/**
* HA state and actions associated with a broker.
@@ -41,7 +43,8 @@ namespace ha {
class HaBroker : public management::Manageable
{
public:
- HaBroker(broker::Broker&);
+ HaBroker(broker::Broker&, const Settings&);
+ ~HaBroker();
// Implement Manageable.
qpid::management::ManagementObject* GetManagementObject() const { return
mgmtObject; }
@@ -50,6 +53,8 @@ class HaBroker : public management::Mana
private:
broker::Broker& broker;
+ Url clientUrl, brokerUrl;
+ std::auto_ptr<Backup> backup;
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
};
}} // namespace qpid::ha
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp Thu Jan 19
23:02:12 2012
@@ -57,7 +57,7 @@ struct HaPlugin : public Plugin {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && settings.enabled) {
QPID_LOG(info, "HA plugin enabled");
- haBroker.reset(new ha::HaBroker(*broker));
+ haBroker.reset(new ha::HaBroker(*broker, settings));
} else
QPID_LOG(info, "HA plugin disabled");
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h Thu Jan 19
23:02:12 2012
@@ -37,7 +37,6 @@ class Settings
public:
Settings() : enabled(false) {}
bool enabled;
- string status; // primary, backup, solo
string clientUrl;
string brokerUrl;
string username, password, mechanism;
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py Thu Jan 19
23:02:12 2012
@@ -444,6 +444,7 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
cluster_lib = os.getenv("CLUSTER_LIB")
+ ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
Added: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233634&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:02:12 2012
@@ -0,0 +1,107 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging,
shutil
+from qpid.messaging import Message, NotFound
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger
+
+
+log = getLogger("qpid.ha-tests")
+
+class ShortTests(BrokerTest):
+ """Short HA functionality tests."""
+
+ def ha_broker(self, args=[], client_url="dummy", broker_url="dummy",
**kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ return Broker(self, args=["--load-module", BrokerTest.ha_lib,
+ "--ha-enable=yes",
+ "--ha-client-url", client_url,
+ "--ha-broker-url", broker_url,
+ ] + args,
+ **kwargs)
+
+ def setup_wiring(self, primary, backup):
+ cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
+ self.assertEqual(0, os.system(cmd))
+
+ def setup_replication(self, primary, backup, queue):
+ self.assertEqual(0,os.system("qpid-route --ack 1 queue add %s %s
qpid.replicator-%s %s"%(backup, primary, queue, queue)))
+
+ # FIXME aconway 2011-11-15: work around async replication.
+ def wait(self, session, address):
+ def check():
+ try:
+ session.receiver(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for %s"%(address)
+
+ def assert_missing(self,session, address):
+ try:
+ session.receiver(a)
+ self.fail("Should not have been replicated: %s"%(address))
+ except NotFound: pass
+
+ def test_replicate_wiring(self):
+
queue="%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"
+
exchange="%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s},
type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"
+
+ # Create some wiring before starting the backup, to test catch-up
+ primary = self.ha_broker(name="primary")
+ s = primary.connect().session()
+ s.sender(queue%("q1", "all")).send(Message("1"))
+ s.sender(queue%("q2", "wiring")).send(Message("2"))
+ s.sender(queue%("q3", "none")).send(Message("3"))
+ s.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
+
+ # Create some after starting backup, test steady-state replication
+ backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ s.sender(queue%("q01", "all")).send(Message("01"))
+ s.sender(queue%("q02", "wiring")).send(Message("02"))
+ s.sender(queue%("q03", "none")).send(Message("03"))
+ s.sender(exchange%("e01", "all", "e01", "q02")).send(Message("04"))
+
+ # Verify replication
+ # FIXME aconway 2011-11-18: We should kill primary here and fail over.
+ s = backup.connect().session()
+ for a in ["q01", "q02", "e01"]: self.wait(s,a)
+ # FIXME aconway 2011-11-18: replicate messages
+# self.assert_browse(s, "q01", ["01", "04", "e01"])
+# self.assert_browse(s, "q02", []) # wiring only
+# self.assert_missing(s,"q03")
+ s.sender("e01").send(Message("e01")) # Verify bind
+ # FIXME aconway 2011-11-18: FIXME replicate bindings
+ # self.assert_browse(s, "q02", ["e01"])
+
+ for a in ["q1", "q2", "e1"]: self.wait(s,a)
+ # FIXME aconway 2011-11-18: replicate messages
+# self.assert_browse(s, "q1", ["1", "4", "e1"])
+# self.assert_browse(s, "q2", []) # wiring only
+# self.assert_missing(s,"q3")
+ s.sender("e1").send(Message("e1")) # Verify bind
+ # FIXME aconway 2011-11-18: FIXME replicate bindings
+ # self.assert_browse(s, "q2", ["e1"])
+
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] +
sys.argv[1:])
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/test_env.sh.in
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/test_env.sh.in?rev=1233634&r1=1233633&r2=1233634&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/test_env.sh.in Thu Jan 19
23:02:12 2012
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/tes
exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
exportmodule ACL_LIB acl.so
exportmodule CLUSTER_LIB cluster.so
+exportmodule HA_LIB ha.so
exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
exportmodule SSLCONNECTOR_LIB sslconnector.so
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]