Author: gsim
Date: Wed May 8 11:55:56 2013
New Revision: 1480239
URL: http://svn.apache.org/r1480239
Log:
QPID-4706: allow selectors to be used on links from an exchange
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed May 8 11:55:56 2013
@@ -33,6 +33,7 @@
#include "qpid/broker/FifoDistributor.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Selector.h"
//TODO: get rid of this
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -209,6 +210,10 @@ Queue::Queue(const string& _name, const
if ( settings.isBrowseOnly ) {
QPID_LOG ( info, "Queue " << name << " is browse-only." );
}
+ if (settings.filter.size()) {
+ selector.reset(new Selector(settings.filter));
+ QPID_LOG (info, "Queue " << name << " using filter: " <<
settings.filter);
+ }
}
Queue::~Queue()
@@ -234,23 +239,35 @@ bool Queue::isExcluded(const Message& ms
return traceExclude.size() && msg.isExcluded(traceExclude);
}
-void Queue::deliver(Message msg, TxBuffer* txn){
+bool Queue::accept(const Message& msg)
+{
//TODO: move some of this out of the queue and into the publishing
//'link' for whatever protocol is used; that would let protocol
//specific stuff be kept out the queue
-
if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg)
&& getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg, 0);
alternateExchange->route(deliverable);
}
+ return false;
} else if (isLocal(msg)) {
//drop message
QPID_LOG(info, "Dropping 'local' message from " << getName());
+ return false;
} else if (isExcluded(msg)) {
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
+ return false;
+ } else if (selector) {
+ return selector->filter(msg);
} else {
+ return true;
+ }
+}
+
+void Queue::deliver(Message msg, TxBuffer* txn)
+{
+ if (accept(msg)) {
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
@@ -1178,6 +1195,10 @@ void tryAutoDeleteImpl(Broker& broker, Q
<< " user:" << userId
<< " rhost:" << connectionId );
queue->destroyed();
+ } else {
+ QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: "
<< queue->getName()
+ << " user:" << userId
+ << " rhost:" << connectionId );
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed May 8 11:55:56 2013
@@ -43,6 +43,7 @@
#include "qmf/org/apache/qpid/broker/Broker.h"
#include "qpid/framing/amqp_types.h"
+#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -66,6 +67,7 @@ class QueueDepth;
class QueueEvents;
class QueueRegistry;
class QueueFactory;
+class Selector;
class TransactionContext;
class TxBuffer;
class MessageDistributor;
@@ -165,8 +167,10 @@ class Queue : public boost::enable_share
UsageBarrier barrier;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
boost::shared_ptr<MessageDistributor> allocator;
+ boost::scoped_ptr<Selector> selector;
virtual void push(Message& msg, bool isRecovery=false);
+ bool accept(const Message&);
void process(Message& msg);
bool enqueue(TransactionContext* ctxt, Message& msg);
bool getNextMessage(Message& msg, Consumer::shared_ptr& c);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Wed May 8 11:55:56
2013
@@ -57,6 +57,7 @@ const std::string FAIRSHARE_ALIAS("x-qpi
const std::string PAGING("qpid.paging");
const std::string MAX_PAGES("qpid.max_pages_loaded");
const std::string PAGE_FACTOR("qpid.page_factor");
+const std::string FILTER("qpid.filter");
const std::string LVQ_LEGACY("qpid.last_value_queue");
const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
@@ -202,6 +203,9 @@ bool QueueSettings::handle(const std::st
} else if (key == PAGE_FACTOR) {
pageFactor = value;
return true;
+ } else if (key == FILTER) {
+ filter = value.asString();
+ return true;
} else {
return false;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h Wed May 8 11:55:56 2013
@@ -83,6 +83,8 @@ struct QueueSettings
uint64_t maxFileSize;
uint64_t maxFileCount;
+ std::string filter;
+
//yuck, yuck
qpid::framing::FieldTable storeSettings;
std::map<std::string, qpid::types::Variant> original;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed May 8 11:55:56
2013
@@ -272,6 +272,10 @@ void Session::setupOutgoing(pn_link_t* l
outgoing[link] = q;
} else if (node.exchange) {
QueueSettings settings(false, true);
+ if (filter.hasSelectorFilter()) {
+ settings.filter = filter.getSelectorFilter();
+ QPID_LOG(debug, "Selector specified for outgoing link from
exchange " << node.exchange->getName() << ": " << settings.filter);
+ }
//TODO: populate settings from source details when available from
engine
boost::shared_ptr<qpid::broker::Queue> queue
= broker.createQueue(name + qpid::types::Uuid(true).str(),
settings, this, "", connection.getUserid(), connection.getId()).first;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1480239&r1=1480238&r2=1480239&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Wed May
8 11:55:56 2013
@@ -95,6 +95,7 @@ const std::string X_SUBSCRIBE("x-subscri
const std::string X_BINDINGS("x-bindings");
const std::string SELECTOR("selector");
const std::string APACHE_SELECTOR("x-apache-selector");
+const std::string QPID_FILTER("qpid.filter");
const std::string EXCHANGE("exchange");
const std::string QUEUE("queue");
const std::string KEY("key");
@@ -523,6 +524,8 @@ Subscription::Subscription(const Address
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
+ std::string selector = Opt(address)/LINK/SELECTOR;
+ if (!selector.empty()) queueOptions.setString(QPID_FILTER, selector);
if (!address.getSubject().empty()) bindSubject(address.getSubject());
else if (linkBindings.empty()) bindAll();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]