Author: gsim
Date: Fri Nov 16 21:32:11 2012
New Revision: 1410576
URL: http://svn.apache.org/viewvc?rev=1410576&view=rev
Log:
QPID-4368: Small improvements to setting and checking filter descriptors
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1410576&r1=1410575&r2=1410576&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h Fri Nov 16 21:32:11 2012
@@ -73,7 +73,14 @@ const Descriptor SASL_INIT(SASL_INIT_COD
const Descriptor SASL_CHALLENGE(SASL_CHALLENGE_CODE);
const Descriptor SASL_RESPONSE(SASL_RESPONSE_CODE);
const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE);
+}
+
+namespace filters {
+const std::string
LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
+const std::string
LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
+const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000);
+const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001);
}
}} // namespace qpid::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1410576&r1=1410575&r2=1410576&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Nov 16 21:32:11
2012
@@ -24,10 +24,13 @@
#include "ManagedConnection.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
@@ -134,13 +137,33 @@ void Session::attach(pn_link_t* link)
c.size = d.size;
descriptor = qpid::amqp::Descriptor(c);
} else {
- QPID_LOG(notice, "Ignoring filter with
descriptor with key " << std::string(fname.start, fname.size) << " and type "
<< pn_data_type(filter));
+ QPID_LOG(notice, "Ignoring filter " <<
std::string(fname.start, fname.size) << " with descriptor of type " <<
pn_data_type(filter));
+ continue;
+ }
+ if
(descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL,
qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
+ if (exchange->getType() ==
qpid::broker::DirectExchange::typeName) {
+ QPID_LOG(info, "Interpreting legacy topic
filter as direct binding key for " << exchange->getName());
+ } else if (exchange->getType() ==
qpid::broker::FanOutExchange::typeName) {
+ QPID_LOG(info, "Ignoring legacy topic
filter on fanout exchange " << exchange->getName());
+ for (int i = 0; i < 3; ++i)
pn_data_next(filter);//move off descriptor, then skip key and value
+ continue;
+ }
+ } else if
(descriptor.match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL,
qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+ if (exchange->getType() ==
qpid::broker::TopicExchange::typeName) {
+ QPID_LOG(info, "Interpreting legacy direct
filter as topic binding key for " << exchange->getName());
+ } else if (exchange->getType() ==
qpid::broker::FanOutExchange::typeName) {
+ QPID_LOG(info, "Ignoring legacy direct
filter on fanout exchange " << exchange->getName());
+ for (int i = 0; i < 3; ++i)
pn_data_next(filter);//move off descriptor, then skip key and value
+ continue;
+ }
+ } else {
+ QPID_LOG(notice, "Ignoring filter with
unsupported descriptor " << descriptor);
+ for (int i = 0; i < 3; ++i)
pn_data_next(filter);//move off descriptor, then skip key and value
continue;
}
- QPID_LOG(debug, "Got filter with descriptor " <<
descriptor);
pn_data_next(filter);
} else {
- QPID_LOG(debug, "Got undescribed filter of type "
<< pn_data_type(filter));
+ QPID_LOG(info, "Got undescribed filter of type "
<< pn_data_type(filter));
}
if (pn_data_type(filter) == PN_STRING) {
pn_bytes_t value = pn_data_get_string(filter);
@@ -180,7 +203,7 @@ void Session::attach(pn_link_t* link)
} else if (exchange->getType() == TopicExchange::typeName) {
exchange->bind(queue, "#", 0);
} else {
- throw qpid::Exception("Exchange type not yet supported over
1.0: " + exchange->getType());/*not-supported?*/
+ throw qpid::Exception("Exchange type requires a filter: " +
exchange->getType());/*not-supported?*/
}
boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link,
*this, out, true));
senders[link] = q;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1410576&r1=1410575&r2=1410576&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Fri Nov 16
21:32:11 2012
@@ -21,6 +21,7 @@
#include "ReceiverContext.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
+#include "qpid/amqp/descriptors.h"
extern "C" {
#include <proton/engine.h>
}
@@ -94,6 +95,15 @@ pn_bytes_t convert(const std::string& s)
result.size = s.size();
return result;
}
+bool hasWildcards(const std::string& key)
+{
+ return key.find('*') != std::string::npos || key.find('#') !=
std::string::npos;
+}
+
+uint64_t getFilterDescriptor(const std::string& key)
+{
+ return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE :
qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
}
void ReceiverContext::configure() const
@@ -110,7 +120,7 @@ void ReceiverContext::configure(pn_termi
pn_data_put_symbol(filter, convert("subject"));
pn_data_put_described(filter);
pn_data_enter(filter);
- pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/);
+ pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
pn_data_put_string(filter, convert(address.getSubject()));
pn_data_exit(filter);
pn_data_exit(filter);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]