Author: gsim
Date: Wed Aug 28 12:41:23 2013
New Revision: 1518181
URL: http://svn.apache.org/r1518181
Log:
QPID-4948: enable browsing
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Aug 28 12:41:23
2013
@@ -44,9 +44,10 @@ void Outgoing::wakeup()
session.wakeup();
}
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string&
source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l,
Session& session, qpid::sys::OutputControl& o, bool e, bool p)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string&
source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l,
Session& session,
+ qpid::sys::OutputControl& o,
SubscriptionType type, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
- Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
+ Consumer(pn_link_name(l), type),
exclusive(e),
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Aug 28 12:41:23 2013
@@ -88,7 +88,8 @@ class Outgoing : public ManagedOutgoingL
class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer,
public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- OutgoingFromQueue(Broker&, const std::string& source, const std::string&
target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser);
+ OutgoingFromQueue(Broker&, const std::string& source, const std::string&
target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
+ qpid::sys::OutputControl& o, SubscriptionType type, bool
exclusive, bool isControllingUser);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Aug 28 12:41:23
2013
@@ -36,6 +36,7 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/amqp/Filter.h"
@@ -319,7 +320,8 @@ void Session::setupOutgoing(pn_link_t* l
if (node.queue) {
authorise.outgoing(node.queue);
- boost::shared_ptr<Outgoing> q(new
OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link,
*this, out, false, node.properties.trackControllingLink()));
+ SubscriptionType type = pn_terminus_get_distribution_mode(source) ==
PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
+ boost::shared_ptr<Outgoing> q(new
OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link,
*this, out, type, false, node.properties.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
@@ -354,7 +356,7 @@ void Session::setupOutgoing(pn_link_t* l
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
- boost::shared_ptr<Outgoing> q(new
OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this,
out, !shared, false));
+ boost::shared_ptr<Outgoing> q(new
OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this,
out, CONSUMER, !shared, false));
q->init();
outgoing[link] = q;
} else if (node.relay) {
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Wed Aug 28
12:41:23 2013
@@ -300,7 +300,6 @@ AddressHelper::AddressHelper(const Addre
if (bind(address, MODE, mode)) {
if (mode == BROWSE) {
browse = true;
- throw qpid::messaging::AddressError("Browse mode not yet supported
over AMQP 1.0.");
} else if (mode != CONSUME) {
throw qpid::messaging::AddressError("Invalid value for mode; must
be 'browse' or 'consume'.");
}
@@ -560,7 +559,7 @@ void AddressHelper::configure(pn_terminu
if (mode == FOR_RECEIVER) {
if (timeout) pn_terminus_set_timeout(terminus, timeout);
if (browse) {
- //when PROTON-139 is resolved, set the required delivery-mode
+ pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY);
}
//set filter(s):
if (!filters.empty()) {
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py Wed Aug 28
12:41:23 2013
@@ -44,3 +44,25 @@ class GeneralTests (VersionTest):
assert response.content == "response" and response.correlation_id ==
"a1", response
self.ssn.acknowledge()
+
+
+ def test_browse(self):
+ snd = self.ssn.sender("#")
+ rcv = self.ssn.receiver("%s; {mode: browse}" % snd.target)
+
+ msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
+
+ for m in msgs: snd.send(m)
+
+ for expected in msgs:
+ msg = rcv.fetch(0)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+ rcv.close()
+
+ rcv = self.ssn.receiver(snd.target)
+ for expected in msgs:
+ msg = rcv.fetch(0)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]