Author: aconway Date: Mon May 28 18:24:43 2012 New Revision: 1343351 URL: http://svn.apache.org/viewvc?rev=1343351&view=rev Log: QPID-3603: Better log messages for HA code.
Identify host name of backup in ReplicatingSubscription logs. Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Mon May 28 18:24:43 2012 @@ -31,6 +31,7 @@ namespace ha { namespace { std::string SYSTEM_ID="system-id"; std::string HOST_NAME="host-name"; +std::string PORT="port"; std::string STATUS="status"; } @@ -41,6 +42,7 @@ FieldTable BrokerInfo::asFieldTable() co FieldTable ft; ft.setString(SYSTEM_ID, systemId.str()); ft.setString(HOST_NAME, hostName); + ft.setInt(PORT, port); ft.setInt(STATUS, status); return ft; } @@ -48,11 +50,12 @@ FieldTable BrokerInfo::asFieldTable() co void BrokerInfo::assign(const FieldTable& ft) { systemId = Uuid(ft.getAsString(SYSTEM_ID)); hostName = ft.getAsString(HOST_NAME); + port = ft.getAsInt(PORT); status = BrokerStatus(ft.getAsInt(STATUS)); } std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { - return o << b.getHostName() << "(" << b.getSystemId() + return o << b.getHostName() << ":" << b.getPort() << "(" << b.getSystemId() << "," << printable(b.getStatus()) << ")"; } Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Mon May 28 18:24:43 2012 @@ -23,6 +23,7 @@ */ #include "Enum.h" +#include "qpid/Url.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/FieldTable.h" #include <string> @@ -37,8 +38,8 @@ namespace ha { class BrokerInfo { public: - BrokerInfo(const std::string& host, const framing::Uuid& id) : - hostName(host), systemId(id) {} + BrokerInfo(const std::string& host, uint16_t port_, const framing::Uuid& id) : + hostName(host), port(port_), systemId(id) {} BrokerInfo(const framing::FieldTable& ft) { assign(ft); } framing::FieldTable asFieldTable() const; @@ -47,10 +48,13 @@ class BrokerInfo framing::Uuid getSystemId() const { return systemId; } std::string getHostName() const { return hostName; } BrokerStatus getStatus() const { return status; } + uint16_t getPort() const { return port; } + void setStatus(BrokerStatus s) { status = s; } private: std::string hostName; + uint16_t port; framing::Uuid systemId; BrokerStatus status; }; Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon May 28 18:24:43 2012 @@ -233,7 +233,7 @@ void BrokerReplicator::initializeBridge( sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName); + QPID_LOG(debug, logPrefix << "Opened configuration bridge: " << queueName); } void BrokerReplicator::route(Deliverable& msg) { @@ -271,7 +271,7 @@ void BrokerReplicator::route(Deliverable } } } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "configuration failed: " << e.what() + QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() << ": while handling: " << list); throw; } @@ -290,7 +290,7 @@ void BrokerReplicator::doEventQueueDecla // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { broker.getQueues().destroy(name); - QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: " + QPID_LOG(warning, logPrefix << "Queue declare event, replaced exsiting: " << name); } std::pair<boost::shared_ptr<Queue>, bool> result = @@ -304,7 +304,7 @@ void BrokerReplicator::doEventQueueDecla values[USER].asString(), values[RHOST].asString()); assert(result.second); // Should be true since we destroyed existing queue above - QPID_LOG(debug, logPrefix << "queue declare event: " << name); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); startQueueReplicator(result.first); } } @@ -323,9 +323,9 @@ void BrokerReplicator::doEventQueueDelet string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (!queue) { - QPID_LOG(warning, logPrefix << "queue delete event, does not exist: " << name); + QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name); } else if (!haBroker.replicateLevel(queue->getSettings())) { - QPID_LOG(warning, logPrefix << "queue delete event, not replicated: " << name); + QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name); } else { boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); if (qr) { @@ -336,7 +336,7 @@ void BrokerReplicator::doEventQueueDelet broker.getExchanges().destroy(qr->getName()); } broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); - QPID_LOG(debug, logPrefix << "queue delete event: " << name); + QPID_LOG(debug, logPrefix << "Queue delete event: " << name); } } @@ -351,7 +351,7 @@ void BrokerReplicator::doEventExchangeDe // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { broker.getExchanges().destroy(name); - QPID_LOG(warning, logPrefix << "exchange declare event, replaced exsiting: " << name) + QPID_LOG(warning, logPrefix << "Exchange declare event, replaced exsiting: " << name) } std::pair<boost::shared_ptr<Exchange>, bool> result = broker.createExchange( @@ -363,7 +363,7 @@ void BrokerReplicator::doEventExchangeDe values[USER].asString(), values[RHOST].asString()); assert(result.second); - QPID_LOG(debug, logPrefix << "exchange declare event: " << name); + QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); } } @@ -371,11 +371,11 @@ void BrokerReplicator::doEventExchangeDe string name = values[EXNAME].asString(); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (!exchange) { - QPID_LOG(warning, logPrefix << "exchange delete event, does not exist: " << name); + QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); } else if (!haBroker.replicateLevel(exchange->getArgs())) { - QPID_LOG(warning, logPrefix << "exchange delete event, not replicated: " << name); + QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { - QPID_LOG(debug, logPrefix << "exchange delete event:" << name); + QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); broker.deleteExchange( name, values[USER].asString(), @@ -397,7 +397,7 @@ void BrokerReplicator::doEventBind(Varia amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, logPrefix << "bind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -417,7 +417,7 @@ void BrokerReplicator::doEventUnbind(Var amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); exchange->unbind(queue, key, &args); - QPID_LOG(debug, logPrefix << "unbind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -444,7 +444,7 @@ void BrokerReplicator::doResponseQueue(V ""/*TODO: what should we use as connection id?*/); // It is normal for the queue to already exist if we are failing over. if (result.second) startQueueReplicator(result.first); - QPID_LOG(debug, logPrefix << "queue response: " << name); + QPID_LOG(debug, logPrefix << "Queue response: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -461,9 +461,9 @@ void BrokerReplicator::doResponseExchang ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, logPrefix << "exchange response: " << values[NAME].asString()); + QPID_LOG(debug, logPrefix << "Exchange response: " << values[NAME].asString()); } else { - QPID_LOG(warning, logPrefix << "exchange response, already exists: " << + QPID_LOG(warning, logPrefix << "Exchange response, already exists: " << values[NAME].asString()); } } @@ -503,7 +503,7 @@ void BrokerReplicator::doResponseBind(Va amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, logPrefix << "bind response: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Bind response: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -545,7 +545,7 @@ void BrokerReplicator::startQueueReplica { if (haBroker.replicateLevel(queue->getSettings()) == ALL) { boost::shared_ptr<QueueReplicator> qr( - new QueueReplicator(LogPrefix(haBroker, queue->getName()), queue, link)); + new QueueReplicator(haBroker, queue, link)); if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon May 28 18:24:43 2012 @@ -55,6 +55,8 @@ HaBroker::HaBroker(broker::Broker& b, co status(STANDALONE), excluder(new ConnectionExcluder(logPrefix, broker.getSystem()->getSystemId())), brokerInfo(broker.getSystem()->getNodeName(), + // TODO aconway 2012-05-24: other transports? + broker.getPort(broker::Broker::TCP_TRANSPORT), broker.getSystem()->getSystemId()) { @@ -180,8 +182,7 @@ Manageable::status_t HaBroker::Managemen boost::shared_ptr<broker::Link> link = result.first; link->setUrl(url); // Create a queue replicator - boost::shared_ptr<QueueReplicator> qr( - new QueueReplicator(LogPrefix(*this, queue->getName()), queue, link)); + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(*this, queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); break; Modified: qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp Mon May 28 18:24:43 2012 @@ -25,22 +25,26 @@ namespace qpid { namespace ha { -LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), status(0) { - if (queue.size()) tail = " queue " + queue; +LogPrefix::LogPrefix(HaBroker& hb, const std::string& msg) : haBroker(&hb), status(0) { + if (msg.size()) setMessage(msg); } -LogPrefix::LogPrefix(LogPrefix& lp, const std::string& queue) +LogPrefix::LogPrefix(LogPrefix& lp, const std::string& msg) : haBroker(lp.haBroker), status(0) { - if (queue.size()) tail = " queue " + queue; + if (msg.size()) setMessage(msg); } LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {} +void LogPrefix::setMessage(const std::string& msg) { + tail = " "+msg+":"; +} + std::ostream& operator<<(std::ostream& o, const LogPrefix& l) { return o << "HA(" << printable(l.status ? *l.status : l.haBroker->getStatus()) - << ")" << l.tail << ": "; + << ")" << l.tail << " "; } }} // namespace qpid::ha Modified: qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h Mon May 28 18:24:43 2012 @@ -40,10 +40,11 @@ class LogPrefix /** For use by all classes other than HaBroker */ LogPrefix(HaBroker& hb, const std::string& queue=std::string()); LogPrefix(LogPrefix& lp, const std::string& queue); - /** For use by the HaBroker itself. */ LogPrefix(BrokerStatus&); + void setMessage(const std::string&); + private: HaBroker* haBroker; BrokerStatus* status; Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon May 28 18:24:43 2012 @@ -21,6 +21,7 @@ #include "Counter.h" #include "QueueReplicator.h" +#include "HaBroker.h" #include "ReplicatingSubscription.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" @@ -59,14 +60,15 @@ bool QueueReplicator::isEventKey(const s return ret; } -QueueReplicator::QueueReplicator(const LogPrefix& lp, +QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), - logPrefix(lp), queue(q), link(l) + haBroker(hb), logPrefix(hb), queue(q), link(l) { - framing::Uuid uuid(true); + Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); + logPrefix.setMessage(q->getName()); QPID_LOG(info, logPrefix << "Created"); } @@ -109,12 +111,15 @@ void QueueReplicator::deactivate() { // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { sys::Mutex::ScopedLock l(lock); - framing::AMQP_ServerProxy peer(sessionHandler.out); + AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - framing::FieldTable settings; + FieldTable settings; settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? - settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, + queue->getPosition()); + settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, + haBroker.getBrokerInfo().asFieldTable()); SequenceNumber front; if (ReplicatingSubscription::getFront(*queue, front)) settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front); Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Mon May 28 18:24:43 2012 @@ -42,6 +42,7 @@ class Deliverable; namespace ha { class Counter; +class HaBroker; /** * Exchange created on a backup broker to replicate a queue on the primary. @@ -62,7 +63,7 @@ class QueueReplicator : public broker::E /** Test if a string is an event key */ static bool isEventKey(const std::string key); - QueueReplicator(const LogPrefix&, + QueueReplicator(HaBroker&, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); @@ -82,6 +83,7 @@ class QueueReplicator : public broker::E void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + HaBroker& haBroker; LogPrefix logPrefix; std::string bridgeName; sys::Mutex lock; Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon May 28 18:24:43 2012 @@ -20,6 +20,7 @@ */ #include "ReplicatingSubscription.h" +#include "HaBroker.h" #include "Primary.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" @@ -40,6 +41,7 @@ using namespace std; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); +const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info"); namespace { const string DOLLAR("$"); @@ -59,7 +61,7 @@ class DequeueRemover } void operator()(const QueuedMessage& message) { - if (message.position >= start && message.position <= end) { + if (message.position >= start && message.position <= end) { //i.e. message is within the intial range and has not been dequeued, //so remove it from the dequeues dequeues.remove(message.position); @@ -127,7 +129,7 @@ ReplicatingSubscription::Factory::create boost::shared_ptr<ReplicatingSubscription> rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( - LogPrefix(haBroker), + haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->addObserver(rs); @@ -173,7 +175,7 @@ ostream& operator<<(ostream& o, const Qu } ReplicatingSubscription::ReplicatingSubscription( - LogPrefix lp, + HaBroker& hb, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -186,17 +188,24 @@ ReplicatingSubscription::ReplicatingSubs const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments), - logPrefix(lp, queue->getName()), + haBroker(hb), + logPrefix(hb), dummy(new Queue(mask(name))), ready(false) { try { - // FIXME aconway 2012-05-22: use hostname from brokerinfo - // Separate the remote part from a "local-remote" address for logging. - string address = parent->getSession().getConnection().getUrl(); - size_t i = address.find('-'); - if (i != string::npos) address = address.substr(i+1); - logSuffix = " (" + address + ")"; + // Set a log prefix message that identifies the remote broker. + // FIXME aconway 2012-05-24: use URL instead of host:port, include transport? + ostringstream os; + os << queue->getName() << "@"; + FieldTable ft; + if (arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) { + BrokerInfo info(ft); + os << info.getHostName() << ":" << info.getPort(); + } + else + os << parent->getSession().getConnection().getUrl(); + logPrefix.setMessage(os.str()); QueueRange primary(*queue); QueueRange backup(arguments); @@ -228,16 +237,16 @@ ReplicatingSubscription::ReplicatingSubs << " backup range " << backup << " primary range " << primary << " position " << position - << " dequeues " << dequeues << logSuffix); + << " dequeues " << dequeues); } catch (const std::exception& e) { throw Exception(QPID_MSG(logPrefix << "Error setting up replication: " - << e.what() << logSuffix)); + << e.what())); } } ReplicatingSubscription::~ReplicatingSubscription() { - QPID_LOG(debug, logPrefix << "Detroyed replicating subscription" << logSuffix); + QPID_LOG(debug, logPrefix << "Detroyed replicating subscription"); } // Called in subscription's connection thread when the subscription is created. @@ -258,7 +267,7 @@ void ReplicatingSubscription::initialize } else { QPID_LOG(debug, logPrefix << "Backup subscription catching up from " - << position << " to " << readyPosition << logSuffix); + << position << " to " << readyPosition); } } @@ -267,7 +276,7 @@ bool ReplicatingSubscription::deliver(Qu try { // Add position events for the subscribed queue, not the internal event queue. if (qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Replicating " << qm); { sys::Mutex::ScopedLock l(lock); assert(position == qm.position); @@ -296,8 +305,8 @@ bool ReplicatingSubscription::deliver(Qu else return ConsumerImpl::deliver(qm); // Message is for internal event queue. } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "error replicating " << qm - << logSuffix << ": " << e.what()); + QPID_LOG(critical, logPrefix << "Error replicating " << qm + << ": " << e.what()); throw; } } @@ -305,7 +314,7 @@ bool ReplicatingSubscription::deliver(Qu void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) { if (ready) return; ready = true; - QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix); + QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); // Notify Primary that a subscription is ready. if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName()); } @@ -319,7 +328,7 @@ void ReplicatingSubscription::complete( { // Handle completions for the subscribed queue, not the internal event queue. if (qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Completed " << qm); Delayed::iterator i= delayed.find(qm.position); // The same message can be completed twice, by acknowledged and // dequeued, remove it from the set so it only gets completed @@ -337,7 +346,7 @@ void ReplicatingSubscription::complete( // Called in arbitrary connection thread *with the queue lock held* void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { // Delay completion - QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Delaying completion of " << qm); qm.payload->getIngressCompletion().startCompleter(); { sys::Mutex::ScopedLock l(lock); @@ -350,7 +359,7 @@ void ReplicatingSubscription::enqueued(c void ReplicatingSubscription::cancelComplete( const Delayed::value_type& v, const sys::Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix); + QPID_LOG(trace, logPrefix << "Cancel completed " << v.second); v.second.payload->getIngressCompletion().finishCompleter(); } @@ -361,8 +370,8 @@ void ReplicatingSubscription::cancel() boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); { sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "cancel backup subscription to " - << getQueue()->getName() << logSuffix); + QPID_LOG(debug, logPrefix << "Cancel backup subscription to " + << getQueue()->getName()); for_each(delayed.begin(), delayed.end(), boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); delayed.clear(); @@ -385,8 +394,7 @@ bool ReplicatingSubscription::hideDelete void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) { if (dequeues.empty()) return; - QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues - << " from " << getQueue()->getName() << logSuffix); + QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -401,7 +409,7 @@ void ReplicatingSubscription::sendDequeu void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { { - QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Dequeued " << qm); sys::Mutex::ScopedLock l(lock); dequeues.add(qm.position); // If we have not yet sent this message to the backup, then @@ -415,8 +423,8 @@ void ReplicatingSubscription::dequeued(c void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&) { if (pos == backupPosition) return; // No need to send. - QPID_LOG(trace, logPrefix << "sending position " << pos - << ", was " << backupPosition << logSuffix); + QPID_LOG(trace, logPrefix << "Sending position " << pos + << ", was " << backupPosition); string buf(pos.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); pos.encode(buffer); Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1343351&r1=1343350&r2=1343351&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon May 28 18:24:43 2012 @@ -76,8 +76,9 @@ class ReplicatingSubscription : public b static const std::string QPID_REPLICATING_SUBSCRIPTION; static const std::string QPID_HIGH_SEQUENCE_NUMBER; static const std::string QPID_LOW_SEQUENCE_NUMBER; + static const std::string QPID_BROKER_INFO; - // FIXME aconway 2012-05-23: these don't belong on ReplicatingSubscription + // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription /** Get position of front message on queue. *@return false if queue is empty. */ @@ -88,7 +89,7 @@ class ReplicatingSubscription : public b static bool getNext(broker::Queue&, framing::SequenceNumber from, framing::SequenceNumber& result); - ReplicatingSubscription(LogPrefix, + ReplicatingSubscription(HaBroker&, broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, @@ -120,8 +121,8 @@ class ReplicatingSubscription : public b private: typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; + HaBroker& haBroker; LogPrefix logPrefix; - std::string logSuffix; boost::shared_ptr<broker::Queue> dummy; // Used to send event messages Delayed delayed; framing::SequenceSet dequeues; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org