Author: aconway
Date: Mon Feb 13 16:18:30 2012
New Revision: 1243581
URL: http://svn.apache.org/viewvc?rev=1243581&view=rev
Log:
QPID-3603: Additional debug logging for messaging client connections.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1243581&r1=1243580&r2=1243581&view=diff
==============================================================================
---
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
(original)
+++
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
Mon Feb 13 16:18:30 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -140,7 +140,7 @@ struct Binding
{
Binding(const Variant::Map&);
Binding(const std::string& exchange, const std::string& queue, const
std::string& key);
-
+
std::string exchange;
std::string queue;
std::string key;
@@ -243,7 +243,7 @@ class Subscription : public Exchange, pu
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
-
+
void bindSubject(const std::string& subject);
void bindAll();
void add(const std::string& exchange, const std::string& key);
@@ -328,7 +328,7 @@ Opt& Opt::operator/(const std::string& n
{
if (options) {
Variant::Map::const_iterator j = options->find(name);
- if (j == options->end()) {
+ if (j == options->end()) {
value = 0;
options = 0;
} else {
@@ -373,7 +373,7 @@ void Opt::collect(qpid::framing::FieldTa
bool AddressResolution::is_unreliable(const Address& address)
{
-
+
return in((Opt(address)/LINK/RELIABILITY).str(),
list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
@@ -475,7 +475,7 @@ void QueueSource::subscribe(qpid::client
checkCreate(session, FOR_RECEIVER);
checkAssert(session, FOR_RECEIVER);
linkBindings.bind(session);
- session.messageSubscribe(arg::queue=name,
+ session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
arg::acquireMode=acquireMode,
@@ -524,7 +524,7 @@ void Subscription::bindSubject(const std
bindings.push_back(b);
} else if (actualType == XML_EXCHANGE) {
Binding b(name, queue, subject);
- std::string query = (boost::format("declare variable $qpid.subject
external; $qpid.subject = '%1%'")
+ std::string query = (boost::format("declare variable $qpid.subject
external; $qpid.subject = '%1%'")
% subject).str();
b.arguments.setString("xquery", query);
bindings.push_back(b);
@@ -540,7 +540,7 @@ void Subscription::bindAll()
if (actualType == TOPIC_EXCHANGE) {
add(name, WILDCARD_ANY);
} else if (actualType == FANOUT_EXCHANGE) {
- add(name, queue);
+ add(name, queue);
} else if (actualType == HEADERS_EXCHANGE) {
Binding b(name, queue, "match-all");
b.arguments.setString("x-match", "all");
@@ -549,7 +549,7 @@ void Subscription::bindAll()
Binding b(name, queue, EMPTY_STRING);
b.arguments.setString("xquery", "true()");
bindings.push_back(b);
- } else {
+ } else {
add(name, EMPTY_STRING);
}
}
@@ -600,12 +600,13 @@ void ExchangeSink::send(qpid::client::As
{
m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
m.status = session.messageTransfer(arg::destination=name,
arg::content=m.message);
+ QPID_LOG(debug, "Sending to exchange " << name << " " <<
m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const
std::string&)
{
linkBindings.unbind(session);
- checkDelete(session, FOR_SENDER);
+ checkDelete(session, FOR_SENDER);
}
QueueSink::QueueSink(const Address& address) : Queue(address) {}
@@ -620,6 +621,7 @@ void QueueSink::send(qpid::client::Async
{
m.message.getDeliveryProperties().setRoutingKey(name);
m.status = session.messageTransfer(arg::content=m.message);
+ QPID_LOG(debug, "Sending to queue " << name << " " <<
m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
}
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
@@ -654,9 +656,9 @@ qpid::framing::ReplyTo AddressResolution
}
}
-bool isQueue(qpid::client::Session session, const qpid::messaging::Address&
address)
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address&
address)
{
- return address.getType() == QUEUE_ADDRESS ||
+ return address.getType() == QUEUE_ADDRESS ||
(address.getType().empty() &&
session.queueQuery(address.getName()).getQueue() == address.getName());
}
@@ -695,7 +697,7 @@ void Queue::checkCreate(qpid::client::As
{
if (enabled(createPolicy, mode)) {
QPID_LOG(debug, "Auto-creating queue '" << name << "'");
- try {
+ try {
session.queueDeclare(arg::queue=name,
arg::durable=durable,
arg::autoDelete=autoDelete,
@@ -749,7 +751,7 @@ void Queue::checkAssert(qpid::client::As
throw AssertionFailed((boost::format("Queue not exclusive:
%1%") % name).str());
}
if (!alternateExchange.empty() && result.getAlternateExchange() !=
alternateExchange) {
- throw AssertionFailed((boost::format("Alternate exchange does
not match for %1%, expected %2%, got %3%")
+ throw AssertionFailed((boost::format("Alternate exchange does
not match for %1%, expected %2%, got %3%")
% name % alternateExchange %
result.getAlternateExchange()).str());
}
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i
!= arguments.end(); ++i) {
@@ -839,7 +841,7 @@ void Exchange::checkAssert(qpid::client:
throw NotFound((boost::format("Exchange not found: %1%") %
name).str());
} else {
if (specifiedType.size() && result.getType() != specifiedType) {
- throw AssertionFailed((boost::format("Exchange %1% is of
incorrect type, expected %2% but got %3%")
+ throw AssertionFailed((boost::format("Exchange %1% is of
incorrect type, expected %2% but got %3%")
% name % specifiedType %
result.getType()).str());
}
if (durable && !result.getDurable()) {
@@ -862,7 +864,7 @@ void Exchange::checkAssert(qpid::client:
}
}
-Binding::Binding(const Variant::Map& b) :
+Binding::Binding(const Variant::Map& b) :
exchange((Opt(b)/EXCHANGE).str()),
queue((Opt(b)/QUEUE).str()),
key((Opt(b)/KEY).str())
@@ -916,11 +918,11 @@ void Bindings::unbind(qpid::client::Asyn
void Bindings::check(qpid::client::AsyncSession& session)
{
for (Bindings::const_iterator i = begin(); i != end(); ++i) {
- ExchangeBoundResult result =
sync(session).exchangeBound(arg::queue=i->queue,
+ ExchangeBoundResult result =
sync(session).exchangeBound(arg::queue=i->queue,
arg::exchange=i->exchange,
arg::bindingKey=i->key);
if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw AssertionFailed((boost::format("No such binding
[exchange=%1%, queue=%2%, key=%3%]")
+ throw AssertionFailed((boost::format("No such binding
[exchange=%1%, queue=%2%, key=%3%]")
% i->exchange % i->queue % i->key).str());
}
}
@@ -950,7 +952,7 @@ void Node::convert(const Variant& option
{
if (!options.isVoid()) {
translate(options.asMap(), arguments);
- }
+ }
}
std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS)
(RECEIVER);
std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS)
(SENDER);
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1243581&r1=1243580&r2=1243581&view=diff
==============================================================================
---
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
(original)
+++
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
Mon Feb 13 16:18:30 2012
@@ -259,6 +259,7 @@ void ConnectionImpl::reopen()
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2,
maxReconnectInterval)) {
if (!reconnect) {
throw qpid::messaging::TransportFailure("Failed to connect
(reconnect disabled)");
@@ -269,8 +270,11 @@ void ConnectionImpl::connect(const qpid:
if (expired(started, timeout)) {
throw qpid::messaging::TransportFailure("Failed to connect within
reconnect timeout");
}
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << "
microseconds, urls="
+ << asString(urls));
qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
}
+ QPID_LOG(debug, "Connection successful, urls=" << asString(urls));
retries = 0;
}
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1243581&r1=1243580&r2=1243581&view=diff
==============================================================================
---
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
(original)
+++
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
Mon Feb 13 16:18:30 2012
@@ -198,7 +198,8 @@ bool IncomingMessages::process(Handler*
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
if (handler && handler->accept(transfer)) {
- QPID_LOG(debug, "Delivered " << *content->getMethod());
+ QPID_LOG(debug, "Delivered " << *content->getMethod() << "
"
+ << *content->getHeaders());
return true;
} else {
//received message for another destination, keep for later
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]