Author: aconway
Date: Thu Oct 18 19:42:14 2012
New Revision: 1399813
URL: http://svn.apache.org/viewvc?rev=1399813&view=rev
Log:
Bug 867030 - QPID-4374: Use configurable credit window for HA backup
subscriptions (Jason Dillaman)
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Oct 18 19:42:14
2012
@@ -312,9 +312,14 @@ void BrokerReplicator::initializeBridge(
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER,
FieldTable());
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA,
FieldTable());
//subscribe to the queue
- peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0,
FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ FieldTable arguments;
+ arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME
aconway 2012-05-22: optimize?
+ peer.getMessage().subscribe(
+ queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
+ false/*exclusive*/, "", 0, arguments);
+ peer.getMessage().setFlowMode(args.i_dest, 1); // Window
+ peer.getMessage().flow(args.i_dest, 0,
haBroker.getSettings().getFlowMessages());
+ peer.getMessage().flow(args.i_dest, 1,
haBroker.getSettings().getFlowBytes());
// Issue a query request for queues, exchanges, bindings and the habroker
// using event queue as the reply-to address
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp Thu Oct 18 19:42:14 2012
@@ -48,6 +48,10 @@ struct Options : public qpid::Options {
"Authentication mechanism for connections between HA brokers")
("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
"Maximum time to wait for an expected backup to connect and
become ready.")
+ ("ha-flow-messages", optValue(settings.flowMessages, "N"),
+ "Flow control message count limit for replication, 0 means no
limit")
+ ("ha-flow-bytes", optValue(settings.flowBytes, "N"),
+ "Flow control byte limit for replication, 0 means no limit")
;
}
};
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Oct 18 19:42:14 2012
@@ -22,6 +22,7 @@
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -39,7 +40,6 @@
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
}
namespace qpid {
@@ -52,6 +52,7 @@ using sys::Mutex;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
const std::string
QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
const std::string
QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
@@ -107,7 +108,8 @@ QueueReplicator::QueueReplicator(HaBroke
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false)
+ queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
+ settings(hb.getSettings())
{
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
@@ -165,23 +167,21 @@ void QueueReplicator::initializeBridge(B
if (!queue) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
- FieldTable settings;
- settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22:
optimize?
- settings.setInt(ReplicatingSubscription::QPID_BACK,
- queue->getPosition());
- settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
- brokerInfo.asFieldTable());
+ FieldTable arguments;
+ arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION,
1);
+ arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22:
optimize?
+ arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition());
+
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable());
SequenceNumber front, back;
queue->getRange(front, back, broker::REPLICATOR);
- if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT,
front);
+ if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT,
front);
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
- false/*exclusive*/, "", 0, settings);
- // FIXME aconway 2012-05-22: use a finite credit window?
- peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
- peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ false/*exclusive*/, "", 0, arguments);
+ peer.getMessage().setFlowMode(getName(), 1); // Window
+ peer.getMessage().flow(getName(), 0, settings.getFlowMessages());
+ peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
}
catch(const exception& e) {
QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " <<
e.what()));
@@ -190,7 +190,7 @@ void QueueReplicator::initializeBridge(B
qpid::Address primary;
link->getRemoteAddress(primary);
QPID_LOG(info, logPrefix << "Connected to " << primary << "(" <<
bridgeName << ")");
- QPID_LOG(trace, logPrefix << "Subscription settings: " << settings);
+ QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
namespace {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Oct 18 19:42:14 2012
@@ -41,6 +41,7 @@ class Deliverable;
namespace ha {
class HaBroker;
+class Settings;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -57,6 +58,8 @@ class QueueReplicator : public broker::E
public:
static const std::string DEQUEUE_EVENT_KEY;
static const std::string POSITION_EVENT_KEY;
+ static const std::string QPID_SYNC_FREQUENCY;
+
static std::string replicatorName(const std::string& queueName);
static bool isReplicatorName(const std::string&);
@@ -101,6 +104,7 @@ class QueueReplicator : public broker::E
boost::shared_ptr<broker::Bridge> bridge;
BrokerInfo brokerInfo;
bool subscribed;
+ const Settings& settings;
};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h Thu Oct 18 19:42:14 2012
@@ -23,6 +23,7 @@
*/
#include "types.h"
+#include "qpid/sys/IntegerTypes.h"
#include <string>
namespace qpid {
@@ -34,7 +35,8 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5)
+ Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5),
+ flowMessages(100), flowBytes(0)
{}
bool cluster; // True if we are a cluster member.
@@ -43,7 +45,13 @@ class Settings
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
double backupTimeout;
- private:
+
+ uint32_t flowMessages, flowBytes;
+
+ static const uint32_t NO_LIMIT=0xFFFFFFFF;
+ static uint32_t flowValue(uint32_t n) { return n ? n : NO_LIMIT; }
+ uint32_t getFlowMessages() const { return flowValue(flowMessages); }
+ uint32_t getFlowBytes() const { return flowValue(flowBytes); }
};
}} // namespace qpid::ha
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]