Author: gsim
Date: Wed May  8 11:55:56 2013
New Revision: 1480239

URL: http://svn.apache.org/r1480239
Log:
QPID-4706: allow selectors to be used on links from an exchange

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed May  8 11:55:56 2013
@@ -33,6 +33,7 @@
 #include "qpid/broker/FifoDistributor.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Selector.h"
 
 //TODO: get rid of this
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -209,6 +210,10 @@ Queue::Queue(const string& _name, const 
     if ( settings.isBrowseOnly ) {
         QPID_LOG ( info, "Queue " << name << " is browse-only." );
     }
+    if (settings.filter.size()) {
+        selector.reset(new Selector(settings.filter));
+        QPID_LOG (info, "Queue " << name << " using filter: " << 
settings.filter);
+    }
 }
 
 Queue::~Queue()
@@ -234,23 +239,35 @@ bool Queue::isExcluded(const Message& ms
     return traceExclude.size() && msg.isExcluded(traceExclude);
 }
 
-void Queue::deliver(Message msg, TxBuffer* txn){
+bool Queue::accept(const Message& msg)
+{
     //TODO: move some of this out of the queue and into the publishing
     //'link' for whatever protocol is used; that would let protocol
     //specific stuff be kept out the queue
-
     if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) 
&& getConsumerCount() == 0) {
         if (alternateExchange) {
             DeliverableMessage deliverable(msg, 0);
             alternateExchange->route(deliverable);
         }
+        return false;
     } else if (isLocal(msg)) {
         //drop message
         QPID_LOG(info, "Dropping 'local' message from " << getName());
+        return false;
     } else if (isExcluded(msg)) {
         //drop message
         QPID_LOG(info, "Dropping excluded message from " << getName());
+        return false;
+    } else if (selector) {
+        return selector->filter(msg);
     } else {
+        return true;
+    }
+}
+
+void Queue::deliver(Message msg, TxBuffer* txn)
+{
+    if (accept(msg)) {
         if (txn) {
             TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
             txn->enlist(op);
@@ -1178,6 +1195,10 @@ void tryAutoDeleteImpl(Broker& broker, Q
             << " user:" << userId
             << " rhost:" << connectionId );
         queue->destroyed();
+    } else {
+        QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " 
<< queue->getName()
+            << " user:" << userId
+            << " rhost:" << connectionId );
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed May  8 11:55:56 2013
@@ -43,6 +43,7 @@
 #include "qmf/org/apache/qpid/broker/Broker.h"
 #include "qpid/framing/amqp_types.h"
 
+#include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
@@ -66,6 +67,7 @@ class QueueDepth;
 class QueueEvents;
 class QueueRegistry;
 class QueueFactory;
+class Selector;
 class TransactionContext;
 class TxBuffer;
 class MessageDistributor;
@@ -165,8 +167,10 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
     boost::shared_ptr<MessageDistributor> allocator;
+    boost::scoped_ptr<Selector> selector;
 
     virtual void push(Message& msg, bool isRecovery=false);
+    bool accept(const Message&);
     void process(Message& msg);
     bool enqueue(TransactionContext* ctxt, Message& msg);
     bool getNextMessage(Message& msg, Consumer::shared_ptr& c);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Wed May  8 11:55:56 
2013
@@ -57,6 +57,7 @@ const std::string FAIRSHARE_ALIAS("x-qpi
 const std::string PAGING("qpid.paging");
 const std::string MAX_PAGES("qpid.max_pages_loaded");
 const std::string PAGE_FACTOR("qpid.page_factor");
+const std::string FILTER("qpid.filter");
 
 const std::string LVQ_LEGACY("qpid.last_value_queue");
 const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
@@ -202,6 +203,9 @@ bool QueueSettings::handle(const std::st
     } else if (key == PAGE_FACTOR) {
         pageFactor = value;
         return true;
+    } else if (key == FILTER) {
+        filter = value.asString();
+        return true;
     } else {
         return false;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h Wed May  8 11:55:56 2013
@@ -83,6 +83,8 @@ struct QueueSettings
     uint64_t maxFileSize;
     uint64_t maxFileCount;
 
+    std::string filter;
+
     //yuck, yuck
     qpid::framing::FieldTable storeSettings;
     std::map<std::string, qpid::types::Variant> original;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed May  8 11:55:56 
2013
@@ -272,6 +272,10 @@ void Session::setupOutgoing(pn_link_t* l
         outgoing[link] = q;
     } else if (node.exchange) {
         QueueSettings settings(false, true);
+        if (filter.hasSelectorFilter()) {
+            settings.filter = filter.getSelectorFilter();
+            QPID_LOG(debug, "Selector specified for outgoing link from 
exchange " << node.exchange->getName() << ": " << settings.filter);
+        }
         //TODO: populate settings from source details when available from 
engine
         boost::shared_ptr<qpid::broker::Queue> queue
             = broker.createQueue(name + qpid::types::Uuid(true).str(), 
settings, this, "", connection.getUserid(), connection.getId()).first;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Wed May  
8 11:55:56 2013
@@ -95,6 +95,7 @@ const std::string X_SUBSCRIBE("x-subscri
 const std::string X_BINDINGS("x-bindings");
 const std::string SELECTOR("selector");
 const std::string APACHE_SELECTOR("x-apache-selector");
+const std::string QPID_FILTER("qpid.filter");
 const std::string EXCHANGE("exchange");
 const std::string QUEUE("queue");
 const std::string KEY("key");
@@ -523,6 +524,8 @@ Subscription::Subscription(const Address
 {
     (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
     (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
+    std::string selector = Opt(address)/LINK/SELECTOR;
+    if (!selector.empty()) queueOptions.setString(QPID_FILTER, selector);
 
     if (!address.getSubject().empty()) bindSubject(address.getSubject());
     else if (linkBindings.empty()) bindAll();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to