Author: gsim
Date: Wed Aug 28 12:41:23 2013
New Revision: 1518181

URL: http://svn.apache.org/r1518181
Log:
QPID-4948: enable browsing

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Aug 28 12:41:23 
2013
@@ -44,9 +44,10 @@ void Outgoing::wakeup()
     session.wakeup();
 }
 
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& 
source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, 
Session& session, qpid::sys::OutputControl& o, bool e, bool p)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& 
source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, 
Session& session,
+                                     qpid::sys::OutputControl& o, 
SubscriptionType type, bool e, bool p)
     : Outgoing(broker, session, source, target, pn_link_name(l)),
-      Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
+      Consumer(pn_link_name(l), type),
       exclusive(e),
       isControllingUser(p),
       queue(q), deliveries(5000), link(l), out(o),

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Aug 28 12:41:23 2013
@@ -88,7 +88,8 @@ class Outgoing : public ManagedOutgoingL
 class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, 
public boost::enable_shared_from_this<OutgoingFromQueue>
 {
   public:
-    OutgoingFromQueue(Broker&, const std::string& source, const std::string& 
target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, 
qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser);
+    OutgoingFromQueue(Broker&, const std::string& source, const std::string& 
target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
+                      qpid::sys::OutputControl& o, SubscriptionType type, bool 
exclusive, bool isControllingUser);
     void setSubjectFilter(const std::string&);
     void setSelectorFilter(const std::string&);
     void init();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Aug 28 12:41:23 
2013
@@ -36,6 +36,7 @@
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/broker/Selector.h"
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/amqp/Filter.h"
@@ -319,7 +320,8 @@ void Session::setupOutgoing(pn_link_t* l
 
     if (node.queue) {
         authorise.outgoing(node.queue);
-        boost::shared_ptr<Outgoing> q(new 
OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, 
*this, out, false, node.properties.trackControllingLink()));
+        SubscriptionType type = pn_terminus_get_distribution_mode(source) == 
PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
+        boost::shared_ptr<Outgoing> q(new 
OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, 
*this, out, type, false, node.properties.trackControllingLink()));
         q->init();
         filter.apply(q);
         outgoing[link] = q;
@@ -354,7 +356,7 @@ void Session::setupOutgoing(pn_link_t* l
         if (!shared) queue->setExclusiveOwner(this);
         authorise.outgoing(node.exchange, queue, filter);
         filter.bind(node.exchange, queue);
-        boost::shared_ptr<Outgoing> q(new 
OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, 
out, !shared, false));
+        boost::shared_ptr<Outgoing> q(new 
OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, 
out, CONSUMER, !shared, false));
         q->init();
         outgoing[link] = q;
     } else if (node.relay) {

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Wed Aug 28 
12:41:23 2013
@@ -300,7 +300,6 @@ AddressHelper::AddressHelper(const Addre
     if (bind(address, MODE, mode)) {
         if (mode == BROWSE) {
             browse = true;
-            throw qpid::messaging::AddressError("Browse mode not yet supported 
over AMQP 1.0.");
         } else if (mode != CONSUME) {
             throw qpid::messaging::AddressError("Invalid value for mode; must 
be 'browse' or 'consume'.");
         }
@@ -560,7 +559,7 @@ void AddressHelper::configure(pn_terminu
     if (mode == FOR_RECEIVER) {
         if (timeout) pn_terminus_set_timeout(terminus, timeout);
         if (browse) {
-            //when PROTON-139 is resolved, set the required delivery-mode
+            pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY);
         }
         //set filter(s):
         if (!filters.empty()) {

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py?rev=1518181&r1=1518180&r2=1518181&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py Wed Aug 28 
12:41:23 2013
@@ -44,3 +44,25 @@ class GeneralTests (VersionTest):
         assert response.content == "response" and response.correlation_id == 
"a1", response
 
         self.ssn.acknowledge()
+
+
+    def test_browse(self):
+        snd = self.ssn.sender("#")
+        rcv = self.ssn.receiver("%s; {mode: browse}" % snd.target)
+
+        msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
+
+        for m in msgs: snd.send(m)
+
+        for expected in msgs:
+            msg = rcv.fetch(0)
+            assert msg.content == expected.content
+            self.ssn.acknowledge(msg)
+        rcv.close()
+
+        rcv = self.ssn.receiver(snd.target)
+        for expected in msgs:
+            msg = rcv.fetch(0)
+            assert msg.content == expected.content
+            self.ssn.acknowledge(msg)
+



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to