Author: pmoravec
Date: Fri Mar  7 10:33:29 2014
New Revision: 1575225

URL: http://svn.apache.org/r1575225
Log:
QPID-5608: [amqp1.0] delete-on-close policy do not work for producers to 
exchanges

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1575225&r1=1575224&r2=1575225&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Mar  7 10:33:29 2014
@@ -444,12 +444,18 @@ void Exchange::incOtherUsers()
     Mutex::ScopedLock l(usersLock);
     otherUsers++;
 }
-void Exchange::decOtherUsers()
+void Exchange::decOtherUsers(bool isControllingLink=false)
 {
     Mutex::ScopedLock l(usersLock);
     assert(otherUsers);
     if (otherUsers) otherUsers--;
-    if (!inUse() && !hasBindings()) checkAutodelete();
+    if (autodelete) {
+        if (isControllingLink) {
+            if (broker) broker->getExchanges().destroy(name);
+        } else if (!inUse() && !hasBindings()) {
+            checkAutoDelete();
+        }
+    }
 }
 bool Exchange::inUse() const
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1575225&r1=1575224&r2=1575225&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Mar  7 10:33:29 2014
@@ -192,7 +192,7 @@ public:
     QPID_BROKER_EXTERN bool inUseAsAlternate();
 
     QPID_BROKER_EXTERN void incOtherUsers();
-    QPID_BROKER_EXTERN void decOtherUsers();
+    QPID_BROKER_EXTERN void decOtherUsers(bool isControllingLink);
     QPID_BROKER_EXTERN bool inUse() const;
 
     virtual std::string getType() const = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1575225&r1=1575224&r2=1575225&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Mar  7 10:33:29 
2014
@@ -185,19 +185,20 @@ class IncomingToQueue : public DecodingI
 class IncomingToExchange : public DecodingIncoming
 {
   public:
-    IncomingToExchange(Broker& b, Session& p, 
boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& 
source)
-        : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), 
exchange(e), authorise(p.getAuthorise())
+    IncomingToExchange(Broker& b, Session& p, 
boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& 
source, bool icl)
+        : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), 
exchange(e), authorise(p.getAuthorise()), isControllingLink(icl)
     {
         exchange->incOtherUsers();
     }
     ~IncomingToExchange()
     {
-        exchange->decOtherUsers();
+        exchange->decOtherUsers(isControllingLink);
     }
     void handle(qpid::broker::Message& m);
   private:
     boost::shared_ptr<qpid::broker::Exchange> exchange;
     Authorise& authorise;
+    bool isControllingLink;
 };
 
 Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
@@ -425,7 +426,7 @@ void Session::setupIncoming(pn_link_t* l
         boost::shared_ptr<Incoming> q(new 
IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, 
node.created && node.properties.trackControllingLink()));
         incoming[link] = q;
     } else if (node.exchange) {
-        boost::shared_ptr<Incoming> e(new 
IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
+        boost::shared_ptr<Incoming> e(new 
IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source, 
node.created && node.properties.trackControllingLink()));
         incoming[link] = e;
     } else if (node.relay) {
         boost::shared_ptr<Incoming> in(new IncomingToRelay(link, 
connection.getBroker(), *this, source, name, pn_link_name(link), node.relay));
@@ -717,6 +718,8 @@ void IncomingToQueue::handle(qpid::broke
 
 void IncomingToExchange::handle(qpid::broker::Message& message)
 {
+    if (exchange->isDestroyed())
+        throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << 
exchange->getName() << " has been deleted."));
     authorise.route(exchange, message);
     DeliverableMessage deliverable(message, 0);
     exchange->route(deliverable);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to