Author: pmoravec
Date: Fri Mar 7 10:33:29 2014
New Revision: 1575225
URL: http://svn.apache.org/r1575225
Log:
QPID-5608: [amqp1.0] delete-on-close policy do not work for producers to
exchanges
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1575225&r1=1575224&r2=1575225&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Mar 7 10:33:29 2014
@@ -444,12 +444,18 @@ void Exchange::incOtherUsers()
Mutex::ScopedLock l(usersLock);
otherUsers++;
}
-void Exchange::decOtherUsers()
+void Exchange::decOtherUsers(bool isControllingLink=false)
{
Mutex::ScopedLock l(usersLock);
assert(otherUsers);
if (otherUsers) otherUsers--;
- if (!inUse() && !hasBindings()) checkAutodelete();
+ if (autodelete) {
+ if (isControllingLink) {
+ if (broker) broker->getExchanges().destroy(name);
+ } else if (!inUse() && !hasBindings()) {
+ checkAutoDelete();
+ }
+ }
}
bool Exchange::inUse() const
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1575225&r1=1575224&r2=1575225&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Mar 7 10:33:29 2014
@@ -192,7 +192,7 @@ public:
QPID_BROKER_EXTERN bool inUseAsAlternate();
QPID_BROKER_EXTERN void incOtherUsers();
- QPID_BROKER_EXTERN void decOtherUsers();
+ QPID_BROKER_EXTERN void decOtherUsers(bool isControllingLink);
QPID_BROKER_EXTERN bool inUse() const;
virtual std::string getType() const = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1575225&r1=1575224&r2=1575225&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Mar 7 10:33:29
2014
@@ -185,19 +185,20 @@ class IncomingToQueue : public DecodingI
class IncomingToExchange : public DecodingIncoming
{
public:
- IncomingToExchange(Broker& b, Session& p,
boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string&
source)
- : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)),
exchange(e), authorise(p.getAuthorise())
+ IncomingToExchange(Broker& b, Session& p,
boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string&
source, bool icl)
+ : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)),
exchange(e), authorise(p.getAuthorise()), isControllingLink(icl)
{
exchange->incOtherUsers();
}
~IncomingToExchange()
{
- exchange->decOtherUsers();
+ exchange->decOtherUsers(isControllingLink);
}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
Authorise& authorise;
+ bool isControllingLink;
};
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
@@ -425,7 +426,7 @@ void Session::setupIncoming(pn_link_t* l
boost::shared_ptr<Incoming> q(new
IncomingToQueue(connection.getBroker(), *this, node.queue, link, source,
node.created && node.properties.trackControllingLink()));
incoming[link] = q;
} else if (node.exchange) {
- boost::shared_ptr<Incoming> e(new
IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
+ boost::shared_ptr<Incoming> e(new
IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source,
node.created && node.properties.trackControllingLink()));
incoming[link] = e;
} else if (node.relay) {
boost::shared_ptr<Incoming> in(new IncomingToRelay(link,
connection.getBroker(), *this, source, name, pn_link_name(link), node.relay));
@@ -717,6 +718,8 @@ void IncomingToQueue::handle(qpid::broke
void IncomingToExchange::handle(qpid::broker::Message& message)
{
+ if (exchange->isDestroyed())
+ throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " <<
exchange->getName() << " has been deleted."));
authorise.route(exchange, message);
DeliverableMessage deliverable(message, 0);
exchange->route(deliverable);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]