Author: aconway
Date: Thu Jan 19 23:08:02 2012
New Revision: 1233678
URL: http://svn.apache.org/viewvc?rev=1233678&view=rev
Log:
QPID-3603: HA logging improvements.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp Thu Jan 19
23:08:02 2012
@@ -249,7 +249,6 @@ void Link::ioThreadProcessing()
if (state != STATE_OPERATIONAL)
return;
- QPID_LOG(debug, "Link::ioThreadProcessing()");
// check for bridge session errors and recover
if (!active.empty()) {
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan
19 23:08:02 2012
@@ -55,7 +55,7 @@ QueueReplicator::QueueReplicator(boost::
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q),
link(l)
{
std::stringstream ss;
- ss << "HA: Backup queue " << queue->getName() << ": ";
+ ss << "HA: Backup " << queue->getName() << ": ";
logPrefix = ss.str();
QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
@@ -133,12 +133,10 @@ template <class T> T decodeContent(Messa
void QueueReplicator::dequeue(SequenceNumber n, const
sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
- if (queue->getPosition() >= n) { // Ignore dequeus we haven't reached yet
+ if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
QueuedMessage message;
- if (queue->acquireMessageAt(n, message)) {
+ if (queue->acquireMessageAt(n, message))
queue->dequeue(0, message);
- QPID_LOG(trace, logPrefix << "Dequeued message "<<
message.position);
- }
}
}
@@ -148,13 +146,13 @@ void QueueReplicator::route(Deliverable&
sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Dequeue update: " << dequeues);
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
//TODO: should be able to optimise the following
for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end();
i++)
dequeue(*i, l);
} else if (key == POSITION_EVENT_KEY) {
SequenceNumber position =
decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Position update: from " <<
queue->getPosition()
+ QPID_LOG(trace, logPrefix << "Position moved from " <<
queue->getPosition()
<< " to " << position);
assert(queue->getPosition() <= position);
//TODO aconway 2011-12-14: Optimize this?
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Thu Jan 19 23:08:02 2012
@@ -47,6 +47,7 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
+/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
SemanticState* parent,
@@ -122,8 +123,6 @@ bool ReplicatingSubscription::deliver(Qu
SequenceNumber send(m.position);
--send; // Send the position before m was enqueued.
sendPositionEvent(send, l);
- QPID_LOG(trace, logPrefix << "Sending position " << send
- << ", was " << backupPosition);
}
backupPosition = m.position;
QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
@@ -137,13 +136,14 @@ ReplicatingSubscription::~ReplicatingSub
void ReplicatingSubscription::cancel()
{
QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
-
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ getQueue()->removeObserver(
+ boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ ConsumerImpl::cancel();
}
// Called before we get notified of the message being available and
// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
+void ReplicatingSubscription::enqueued(const QueuedMessage& m) {
//delay completion
m.payload->getIngressCompletion().startCompleter();
}
@@ -164,6 +164,8 @@ void ReplicatingSubscription::sendDequeu
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
{
+ QPID_LOG(trace, logPrefix << "Sending position " << position
+ << ", was " << backupPosition);
string buf(backupPosition.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);
@@ -207,7 +209,6 @@ void ReplicatingSubscription::sendEvent(
// the messageLock in the queue. Called in arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
- QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
@@ -219,8 +220,10 @@ void ReplicatingSubscription::dequeued(c
// not under the message lock?
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, logPrefix << "Completed message " << m.position <<
" early");
+ QPID_LOG(trace, logPrefix << "Dequeued and completed message " <<
m.position << " early");
}
+ else
+ QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
}
notify(); // Ensure a call to doDispatch
}
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
Thu Jan 19 23:08:02 2012
@@ -76,13 +76,15 @@ class ReplicatingSubscription : public b
~ReplicatingSubscription();
- void cancel();
+ // QueueObserver overrides.
bool deliver(broker::QueuedMessage& msg);
void enqueued(const broker::QueuedMessage&);
void dequeued(const broker::QueuedMessage&);
void acquired(const broker::QueuedMessage&) {}
void requeued(const broker::QueuedMessage&) {}
+ // Consumer overrides.
+ void cancel();
bool isDelayedCompletion() const { return true; }
protected:
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]