Author: gsim
Date: Sat Nov 17 17:08:14 2012
New Revision: 1410750

URL: http://svn.apache.org/viewvc?rev=1410750&view=rev
Log:
QPID-4368: Added support for subject filtering on queues

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp Sat Nov 17 17:08:14 2012
@@ -74,6 +74,11 @@ bool Filter::hasSubjectFilter() const
     return !subjectFilter.value.empty();
 }
 
+std::string Filter::getSubjectFilter() const
+{
+    return subjectFilter.value;
+}
+
 
 void Filter::setSubjectFilter(const StringFilter& filter)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h Sat Nov 17 17:08:14 2012
@@ -39,6 +39,7 @@ class Filter : qpid::amqp::MapReader
     void read(pn_data_t*);
     void write(pn_data_t*);
     bool hasSubjectFilter() const;
+    std::string getSubjectFilter() const;
     void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> 
queue);
   private:
     struct StringFilter

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Sat Nov 17 17:08:14 
2012
@@ -22,6 +22,7 @@
 #include "qpid/broker/amqp/Header.h"
 #include "qpid/broker/amqp/Translation.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicKeyNode.h"
 #include "qpid/sys/OutputControl.h"
 #include "qpid/amqp/MessageEncoder.h"
 #include "qpid/log/Statement.h"
@@ -163,6 +164,57 @@ bool Outgoing::accept(const qpid::broker
     return canDeliver();
 }
 
+void Outgoing::setSubjectFilter(const std::string& f)
+{
+    subjectFilter = f;
+}
+
+namespace {
+
+bool match(TokenIterator& filter, TokenIterator& target)
+{
+    bool wild = false;
+    while (!filter.finished())
+    {
+        if (filter.match1('*')) {
+            if (target.finished()) return false;
+            //else move to next word in filter target
+            filter.next();
+            target.next();
+        } else if (filter.match1('#')) {
+            // i.e. filter word is '#' which can match a variable number of 
words in the target
+            filter.next();
+            if (filter.finished()) return true;
+            else if (target.finished()) return false;
+            wild = true;
+        } else {
+            //filter word needs to match target exactly
+            if (target.finished()) return false;
+            std::string word;
+            target.pop(word);
+            if (filter.match(word)) {
+                wild = false;
+                filter.next();
+            } else if (!wild) {
+                return false;
+            }
+        }
+    }
+    return target.finished();
+}
+bool match(const std::string& filter, const std::string& target)
+{
+    TokenIterator lhs(filter);
+    TokenIterator rhs(target);
+    return match(lhs, rhs);
+}
+}
+
+bool Outgoing::filter(const qpid::broker::Message& m)
+{
+    return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || 
match(subjectFilter, m.getRoutingKey());
+}
+
 void Outgoing::cancel() {}
 
 void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Sat Nov 17 17:08:14 2012
@@ -60,6 +60,7 @@ class Outgoing : public qpid::broker::Co
 {
   public:
     Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, 
ManagedSession&, qpid::sys::OutputControl& o, bool topic);
+    void setSubjectFilter(const std::string&);
     void init();
     bool dispatch();
     void write(const char* data, size_t size);
@@ -71,6 +72,7 @@ class Outgoing : public qpid::broker::Co
     bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
     void notify();
     bool accept(const qpid::broker::Message&);
+    bool filter(const qpid::broker::Message&);
     void cancel();
     void acknowledged(const qpid::broker::DeliveryRecord&);
     qpid::broker::OwnershipToken* getSession();
@@ -99,6 +101,7 @@ class Outgoing : public qpid::broker::Co
     size_t current;
     int outstanding;
     std::vector<char> buffer;
+    std::string subjectFilter;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Sat Nov 17 17:08:14 
2012
@@ -122,18 +122,21 @@ void Session::attach(pn_link_t* link)
         pn_terminus_set_address(pn_link_source(link), name.c_str());
 
         ResolvedNode node = resolve(name, source);
+        Filter filter;
+        filter.read(pn_terminus_filter(source));
 
         if (node.queue) {
             boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, 
link, *this, out, false));
             q->init();
+            if (filter.hasSubjectFilter()) {
+                q->setSubjectFilter(filter.getSubjectFilter());
+            }
             senders[link] = q;
         } else if (node.exchange) {
             QueueSettings settings(false, true);
             //TODO: populate settings from source details when available from 
engine
             boost::shared_ptr<qpid::broker::Queue> queue
                 = broker.createQueue(name + qpid::types::Uuid(true).str(), 
settings, this, "", connection.getUserid(), connection.getId()).first;
-            Filter filter;
-            filter.read(pn_terminus_filter(source));
             if (filter.hasSubjectFilter()) {
                 filter.bind(node.exchange, queue);
                 filter.write(pn_terminus_filter(pn_link_source(link)));



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to