Author: aconway
Date: Thu Jan 19 23:04:44 2012
New Revision: 1233652
URL: http://svn.apache.org/viewvc?rev=1233652&view=rev
Log:
QPID-3603: Cleanup of HA log messages.
Added:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp (with props)
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h (with props)
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk Thu Jan 19 23:04:44 2012
@@ -28,6 +28,8 @@ ha_la_SOURCES = \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
+ qpid/ha/Logging.h \
+ qpid/ha/Logging.cpp \
qpid/ha/Settings.h \
qpid/ha/QueueReplicator.h \
qpid/ha/QueueReplicator.cpp \
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19
23:04:44 2012
@@ -46,7 +46,7 @@ Backup::Backup(broker::Broker& b, const
// FIXME aconway 2011-11-24: identifying the primary.
if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary
hack to identify primary.
Url url(s.brokerUrl);
- QPID_LOG(info, "HA: Acting as backup to " << url);
+ QPID_LOG(info, "HA: Acting as backup");
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// FIXME aconway 2011-11-17: TBD: link management, discovery,
fail-over.
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jan 19
23:04:44 2012
@@ -59,8 +59,8 @@ HaBroker::HaBroker(broker::Broker& b, co
mgmtObject->set_status("solo");
ma->addObject(mgmtObject);
}
- QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
- << ", broker-url=" << brokerUrl);
+ QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
+ << " broker-url=" << brokerUrl);
backup.reset(new Backup(broker, s));
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp Thu Jan 19
23:04:44 2012
@@ -56,7 +56,6 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && settings.enabled) {
- QPID_LOG(info, "HA: Enabled");
haBroker.reset(new ha::HaBroker(*broker, settings));
} else
QPID_LOG(info, "HA: Disabled");
Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp?rev=1233652&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp Thu Jan 19
23:04:44 2012
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Logging.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace ha {
+
+QueuePos::QueuePos(const broker::QueuedMessage& qm)
+ : queue(qm.queue), position(qm.position) {}
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& qp) {
+ return o << qp.queue->getName() << "[" << qp.position << "]";
+}
+
+}} // namesopace qpid::ha
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h?rev=1233652&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h Thu Jan 19
23:04:44 2012
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_HAOSTREAM_H
+#define QPID_HA_HAOSTREAM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iosfwd>
+
+/**@file ostream helpers used in log messages. */
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace framing {
+class SequenceNumber;
+}
+
+namespace ha {
+
+// Other printable helpers
+
+struct QueuePos {
+ const broker::Queue* queue;
+ const framing::SequenceNumber& position;
+ QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos)
+ : queue(q), position(pos) {}
+ QueuePos(const broker::QueuedMessage& qm);
+};
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& h);
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_HAOSTREAM_H*/
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan
19 23:04:44 2012
@@ -21,6 +21,7 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Logging.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -82,7 +83,7 @@ void QueueReplicator::initializeBridge(B
peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1,
0, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " <<
args.i_dest);
+ QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to "
<< args.i_dest);
}
@@ -105,14 +106,17 @@ void QueueReplicator::route(Deliverable&
if (current < *i) {
//haven't got that far yet, record the dequeue
dequeued.add(*i);
- QPID_LOG(debug, "Recording dequeue of message at " << *i << "
from " << queue->getName());
+ QPID_LOG(trace, "HA: Recording dequeue of message at " <<
+ QueuePos(queue.get(), *i));
} else {
QueuedMessage message;
if (queue->acquireMessageAt(*i, message)) {
queue->dequeue(0, message);
- QPID_LOG(info, "Dequeued message at " << *i << " from " <<
queue->getName());
+ QPID_LOG(info, "HA: Dequeued message "<<
QueuePos(message));
} else {
- QPID_LOG(error, "Unable to dequeue message at " << *i << "
from " << queue->getName());
+ // FIXME aconway 2011-11-29: error handling
+ QPID_LOG(error, "HA: Unable to dequeue message at "
+ << QueuePos(queue.get(), *i));
}
}
}
@@ -121,10 +125,10 @@ void QueueReplicator::route(Deliverable&
//dequeued before our subscription reached them
while (dequeued.contains(++current)) {
dequeued.remove(current);
- QPID_LOG(debug, "Skipping dequeued message at " << current << "
from " << queue->getName());
+ QPID_LOG(debug, "HA: Skipping dequeued message at " << current <<
" from " << queue->getName());
queue->setPosition(current);
}
- QPID_LOG(info, "Enqueued message on " << queue->getName() << ";
currently at " << current);
+ QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << ";
currently at " << current);
msg.deliverTo(queue);
}
}
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Thu Jan 19 23:04:44 2012
@@ -20,6 +20,7 @@
*/
#include "ReplicatingSubscription.h"
+#include "Logging.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -39,7 +40,7 @@ const string ReplicatingSubscription::QP
const string
ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
const string DOLLAR("$");
-const string INTERNAL("_internal");
+const string INTERNAL("-internal");
class ReplicationStateInitialiser
{
@@ -55,7 +56,7 @@ class ReplicationStateInitialiser
void operator()(const QueuedMessage& message) {
if (message.position < start) {
//replica does not have a message that should still be on the queue
- QPID_LOG(warning, "Replica appears to be missing message at " <<
message.position);
+ QPID_LOG(warning, "HA: Replica missing message " <<
QueuePos(message));
} else if (message.position >= start && message.position <= end) {
//i.e. message is within the intial range and has not been
dequeued, so remove it from the results
results.remove(message.position);
@@ -75,63 +76,63 @@ string mask(const string& in)
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
- SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
bool ack,
- bool _acquire,
- bool _exclusive,
- const string& _tag,
- const string& _resumeId,
- uint64_t _resumeTtl,
- const framing::FieldTable& _arguments
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
) {
-
return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
- new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire,
_exclusive, _tag, _resumeId, _resumeTtl, _arguments));
+ new ReplicatingSubscription(parent, name, queue, ack, acquire,
exclusive, tag, resumeId, resumeTtl, arguments));
}
ReplicatingSubscription::ReplicatingSubscription(
- SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
bool ack,
- bool _acquire,
- bool _exclusive,
- const string& _tag,
- const string& _resumeId,
- uint64_t _resumeTtl,
- const framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag,
_resumeId, _resumeTtl, _arguments),
- events(new Queue(mask(_name))),
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments),
+ events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ QPID_LOG(debug, "HA: Replicating subscription " << name << " to " <<
queue->getName());
// FIXME aconway 2011-11-25: string constants.
- QPID_LOG(debug, "HA: replicating subscription " << _name << " to " <<
_queue->getName());
- if (_arguments.isSet("qpid.high_sequence_number")) {
- qpid::framing::SequenceNumber hwm =
_arguments.getAsInt("qpid.high_sequence_number");
+ if (arguments.isSet("qpid.high_sequence_number")) {
+ qpid::framing::SequenceNumber hwm =
arguments.getAsInt("qpid.high_sequence_number");
qpid::framing::SequenceNumber lwm;
- if (_arguments.isSet("qpid.low_sequence_number")) {
- lwm = _arguments.getAsInt("qpid.low_sequence_number");
+ if (arguments.isSet("qpid.low_sequence_number")) {
+ lwm = arguments.getAsInt("qpid.low_sequence_number");
} else {
lwm = hwm;
}
qpid::framing::SequenceNumber oldest;
- if (_queue->getOldest(oldest)) {
+ if (queue->getOldest(oldest)) {
if (oldest >= hwm) {
range.add(lwm, --oldest);
} else if (oldest >= lwm) {
ReplicationStateInitialiser initialiser(range, lwm, hwm);
- _queue->eachMessage(initialiser);
+ queue->eachMessage(initialiser);
} else { //i.e. have older message on master than is reported to
exist on replica
- QPID_LOG(warning, "Replica appears to be missing message on
master");
+ QPID_LOG(warning, "HA: Replica missing message on master");
}
} else {
//local queue (i.e. master) is empty
- range.add(lwm, _queue->getPosition());
+ range.add(lwm, queue->getPosition());
}
- QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() <<
" are " << range
- << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" <<
_queue->getPosition() << ")");
+ QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName()
<< " are " << range
+ << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" <<
queue->getPosition() << ")");
//set position of 'cursor'
position = hwm;
}
@@ -142,11 +143,6 @@ bool ReplicatingSubscription::deliver(Qu
return ConsumerImpl::deliver(m);
}
-void ReplicatingSubscription::init()
-{
-
getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
void ReplicatingSubscription::cancel()
{
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
@@ -158,10 +154,9 @@ ReplicatingSubscription::~ReplicatingSub
//under the message lock in the queue
void ReplicatingSubscription::enqueued(const QueuedMessage& m)
{
- QPID_LOG(debug, "Enqueued message at " << m.position);
+ QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
//delay completion
m.payload->getIngressCompletion().startCompleter();
- QPID_LOG(debug, "Delayed " << m.payload.get());
}
void ReplicatingSubscription::generateDequeueEvent()
@@ -203,12 +198,13 @@ void ReplicatingSubscription::dequeued(c
{
sys::Mutex::ScopedLock l(lock);
range.add(m.position);
- QPID_LOG(debug, "Updated dequeue event to include message at " <<
m.position << "; subscription is at " << position);
+ // FIXME aconway 2011-11-29: q[pos]
+ QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m)
<< "; subscription is at " << position);
}
notify();
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << m.payload.get() << " early due to
dequeue");
+ QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to
dequeue");
}
}
@@ -237,5 +233,4 @@ bool ReplicatingSubscription::Delegating
void ReplicatingSubscription::DelegatingConsumer::cancel() {}
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() {
return delegate.getSession(); }
-
}} // namespace qpid::broker
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
Thu Jan 19 23:04:44 2012
@@ -71,7 +71,6 @@ class ReplicatingSubscription : public b
~ReplicatingSubscription();
- void init();
void cancel();
bool deliver(broker::QueuedMessage& msg);
void enqueued(const broker::QueuedMessage&);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:04:44 2012
@@ -213,7 +213,7 @@ void WiringReplicator::initializeBridge(
sendQuery(QUEUE, queueName, sessionHandler);
sendQuery(EXCHANGE, queueName, sessionHandler);
sendQuery(BINDING, queueName, sessionHandler);
- QPID_LOG(debug, "Activated wiring replicator")
+ QPID_LOG(debug, "HA: Activated wiring replicator")
}
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const
framing::FieldTable* headers) {
@@ -227,10 +227,10 @@ void WiringReplicator::route(Deliverable
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end();
++i) {
- Variant::Map& map = list.front().asMap();
+ Variant::Map& map = i->asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- QPID_LOG(trace, "HA: Configuration event from primary: " <<
values);
+ QPID_LOG(trace, "HA: Configuration event: schema=" << schema
<< " values=" << values);
if (match<EventQueueDeclare>(schema))
doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema))
doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema))
doEventExchangeDeclare(values);
@@ -246,7 +246,7 @@ void WiringReplicator::route(Deliverable
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
- QPID_LOG(trace, "HA: Configuration response from primary: " <<
values);
+ QPID_LOG(trace, "HA: Configuration response type=" << type <<
" values=" << values);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
@@ -284,6 +284,7 @@ void WiringReplicator::doEventQueueDecla
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
+ QPID_LOG(debug, "HA: New queue replica " << name);
startQueueReplicator(result.first);
} else {
QPID_LOG(warning, "HA: Replicated queue " << name << " already
exists");
@@ -309,7 +310,7 @@ void WiringReplicator::doEventExchangeDe
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- QPID_LOG(debug, "HA: Creating exchange from event " << name);
+ QPID_LOG(debug, "HA: New exchange replica " << name);
if (!broker.createExchange(
name,
values[EXTYPE].asString(),
@@ -320,7 +321,7 @@ void WiringReplicator::doEventExchangeDe
values[RHOST].asString()).second) {
// FIXME aconway 2011-11-22: should delete pre-exisitng exchange
// and re-create from event. See comment in doEventQueueDeclare.
- QPID_LOG(warning, "Replicated exchange " << name << " already
exists");
+ QPID_LOG(warning, "HA: Replicated exchange " << name << " already
exists");
}
}
}
@@ -348,7 +349,7 @@ void WiringReplicator::doEventBind(Varia
framing::FieldTable args;
amqp_0_10::translate(values[ARGS].asMap(), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "Replicated binding exchange=" <<
exchange->getName()
+ QPID_LOG(debug, "HA: Replicated binding exchange=" <<
exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->bind(queue, key, &args);
@@ -363,7 +364,6 @@ void WiringReplicator::doResponseQueue(V
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
string name(values[NAME].asString());
- QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() <<
" (in catch-up)");
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
@@ -375,11 +375,12 @@ void WiringReplicator::doResponseQueue(V
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
+ QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in
catch-up)");
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
- QPID_LOG(warning, "Replicated queue " << values[NAME] << " already
exists (in catch-up)");
+ QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already
exists (in catch-up)");
}
}
@@ -388,7 +389,7 @@ void WiringReplicator::doResponseExchang
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString()
<< " (in catch-up)");
+ QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in
catch-up)");
if (!broker.createExchange(
values[NAME].asString(),
values[TYPE].asString(),
@@ -397,7 +398,7 @@ void WiringReplicator::doResponseExchang
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Replicated exchange " << values[QNAME] << " already
exists (in catch-up)");
+ QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << "
already exists (in catch-up)");
}
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:04:44 2012
@@ -39,10 +39,6 @@ class ShortTests(BrokerTest):
] + args,
**kwargs)
- def setup_wiring(self, primary, backup):
- cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
- self.assertEqual(0, os.system(cmd))
-
# FIXME aconway 2011-11-15: work around async replication.
def wait(self, session, address):
def check():
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]