Author: gsim
Date: Fri Nov 16 21:32:11 2012
New Revision: 1410576

URL: http://svn.apache.org/viewvc?rev=1410576&view=rev
Log:
QPID-4368: Small improvements to setting and checking filter descriptors

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1410576&r1=1410575&r2=1410576&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h Fri Nov 16 21:32:11 2012
@@ -73,7 +73,14 @@ const Descriptor SASL_INIT(SASL_INIT_COD
 const Descriptor SASL_CHALLENGE(SASL_CHALLENGE_CODE);
 const Descriptor SASL_RESPONSE(SASL_RESPONSE_CODE);
 const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE);
+}
+
+namespace filters {
+const std::string 
LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
+const std::string 
LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
 
+const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000);
+const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001);
 }
 
 }} // namespace qpid::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1410576&r1=1410575&r2=1410576&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Nov 16 21:32:11 
2012
@@ -24,10 +24,13 @@
 #include "ManagedConnection.h"
 #include "qpid/amqp/CharSequence.h"
 #include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/descriptors.h"
 #include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Queue.h"
@@ -134,13 +137,33 @@ void Session::attach(pn_link_t* link)
                                 c.size = d.size;
                                 descriptor = qpid::amqp::Descriptor(c);
                             } else {
-                                QPID_LOG(notice, "Ignoring filter with 
descriptor with key " << std::string(fname.start, fname.size) << " and type " 
<< pn_data_type(filter));
+                                QPID_LOG(notice, "Ignoring filter " << 
std::string(fname.start, fname.size) << " with descriptor of type " << 
pn_data_type(filter));
+                                continue;
+                            }
+                            if 
(descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, 
qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
+                                if (exchange->getType() == 
qpid::broker::DirectExchange::typeName) {
+                                    QPID_LOG(info, "Interpreting legacy topic 
filter as direct binding key for " << exchange->getName());
+                                } else if (exchange->getType() == 
qpid::broker::FanOutExchange::typeName) {
+                                    QPID_LOG(info, "Ignoring legacy topic 
filter on fanout exchange " << exchange->getName());
+                                    for (int i = 0; i < 3; ++i) 
pn_data_next(filter);//move off descriptor, then skip key and value
+                                    continue;
+                                }
+                            } else if 
(descriptor.match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, 
qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+                                if (exchange->getType() == 
qpid::broker::TopicExchange::typeName) {
+                                    QPID_LOG(info, "Interpreting legacy direct 
filter as topic binding key for " << exchange->getName());
+                                } else if (exchange->getType() == 
qpid::broker::FanOutExchange::typeName) {
+                                    QPID_LOG(info, "Ignoring legacy direct 
filter on fanout exchange " << exchange->getName());
+                                    for (int i = 0; i < 3; ++i) 
pn_data_next(filter);//move off descriptor, then skip key and value
+                                    continue;
+                                }
+                            } else {
+                                QPID_LOG(notice, "Ignoring filter with 
unsupported descriptor " << descriptor);
+                                for (int i = 0; i < 3; ++i) 
pn_data_next(filter);//move off descriptor, then skip key and value
                                 continue;
                             }
-                            QPID_LOG(debug, "Got filter with descriptor " << 
descriptor);
                             pn_data_next(filter);
                         } else {
-                            QPID_LOG(debug, "Got undescribed filter of type " 
<< pn_data_type(filter));
+                            QPID_LOG(info, "Got undescribed filter of type " 
<< pn_data_type(filter));
                         }
                         if (pn_data_type(filter) == PN_STRING) {
                             pn_bytes_t value = pn_data_get_string(filter);
@@ -180,7 +203,7 @@ void Session::attach(pn_link_t* link)
             } else if (exchange->getType() == TopicExchange::typeName) {
                 exchange->bind(queue, "#", 0);
             } else {
-                throw qpid::Exception("Exchange type not yet supported over 
1.0: " + exchange->getType());/*not-supported?*/
+                throw qpid::Exception("Exchange type requires a filter: " + 
exchange->getType());/*not-supported?*/
             }
             boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, 
*this, out, true));
             senders[link] = q;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1410576&r1=1410575&r2=1410576&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Fri Nov 16 
21:32:11 2012
@@ -21,6 +21,7 @@
 #include "ReceiverContext.h"
 #include "qpid/messaging/Duration.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/amqp/descriptors.h"
 extern "C" {
 #include <proton/engine.h>
 }
@@ -94,6 +95,15 @@ pn_bytes_t convert(const std::string& s)
     result.size = s.size();
     return result;
 }
+bool hasWildcards(const std::string& key)
+{
+    return key.find('*') != std::string::npos || key.find('#') != 
std::string::npos;
+}
+
+uint64_t getFilterDescriptor(const std::string& key)
+{
+    return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : 
qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
 }
 
 void ReceiverContext::configure() const
@@ -110,7 +120,7 @@ void ReceiverContext::configure(pn_termi
     pn_data_put_symbol(filter, convert("subject"));
     pn_data_put_described(filter);
     pn_data_enter(filter);
-    pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/);
+    pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
     pn_data_put_string(filter, convert(address.getSubject()));
     pn_data_exit(filter);
     pn_data_exit(filter);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to