Author: gsim
Date: Wed Dec 17 14:29:44 2014
New Revision: 1646260
URL: http://svn.apache.org/r1646260
Log:
QPID-6274: Delete subscription queue immediately on link close
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Dec 17 14:29:44 2014
@@ -1300,10 +1300,10 @@ struct AutoDeleteTask : qpid::sys::Timer
}
};
-void Queue::scheduleAutoDelete()
+void Queue::scheduleAutoDelete(bool immediate)
{
if (canAutoDelete()) {
- if (settings.autoDeleteDelay) {
+ if (!immediate && settings.autoDeleteDelay) {
AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC));
autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new
AutoDeleteTask(shared_from_this(), time));
broker->getTimer().add(autoDeleteTask);
@@ -1343,7 +1343,7 @@ bool Queue::isExclusiveOwner(const Owner
return o == owner;
}
-void Queue::releaseExclusiveOwnership()
+void Queue::releaseExclusiveOwnership(bool immediateExpiry)
{
bool unused;
{
@@ -1355,7 +1355,7 @@ void Queue::releaseExclusiveOwnership()
unused = !users.isUsed();
}
if (unused && settings.autodelete) {
- scheduleAutoDelete();
+ scheduleAutoDelete(immediateExpiry);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Dec 17 14:29:44 2014
@@ -379,7 +379,7 @@ class Queue : public boost::enable_share
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
inline const std::string& getName() const { return name; }
QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o)
const;
- QPID_BROKER_EXTERN void releaseExclusiveOwnership();
+ QPID_BROKER_EXTERN void releaseExclusiveOwnership(bool
immediateExpiry=false);
QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
@@ -389,7 +389,7 @@ class Queue : public boost::enable_share
inline bool isAutoDelete() const { return settings.autodelete; }
inline bool isBrowseOnly() const { return settings.isBrowseOnly; }
QPID_BROKER_EXTERN bool canAutoDelete() const;
- QPID_BROKER_EXTERN void scheduleAutoDelete();
+ QPID_BROKER_EXTERN void scheduleAutoDelete(bool immediate=false);
QPID_BROKER_EXTERN bool isDeleted() const;
const QueueBindings& getBindings() const { return bindings; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Wed Dec 17 14:29:44
2014
@@ -57,7 +57,7 @@ uint32_t Incoming::getCredit()
return credit;//TODO: proper flow control
}
-void Incoming::detached()
+void Incoming::detached(bool /*closed*/)
{
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h Wed Dec 17 14:29:44 2014
@@ -42,7 +42,7 @@ class Incoming : public ManagedIncomingL
virtual ~Incoming();
virtual bool doWork();//do anything that requires output
virtual bool haveWork();//called when handling input to see whether any
output work is needed
- virtual void detached();
+ virtual void detached(bool closed);
virtual void readable(pn_delivery_t* delivery) = 0;
void verify(const std::string& userid, const std::string& defaultRealm);
void wakeup();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Dec 17 14:29:44
2014
@@ -156,7 +156,7 @@ bool OutgoingFromQueue::canDeliver()
return deliveries[current].delivery == 0 && pn_link_credit(link);
}
-void OutgoingFromQueue::detached()
+void OutgoingFromQueue::detached(bool closed)
{
QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " <<
queue->getName());
queue->cancel(shared_from_this());
@@ -164,12 +164,14 @@ void OutgoingFromQueue::detached()
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
}
- if (exclusive) queue->releaseExclusiveOwnership();
- else if (isControllingUser) queue->releaseFromUse(true);
+ if (exclusive) {
+ queue->releaseExclusiveOwnership(closed);
+ } else if (isControllingUser) {
+ queue->releaseFromUse(true);
+ }
cancelled = true;
}
-
OutgoingFromQueue::~OutgoingFromQueue()
{
if (!cancelled && isControllingUser) queue->releaseFromUse(true);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Dec 17 14:29:44 2014
@@ -70,7 +70,7 @@ class Outgoing : public ManagedOutgoingL
/**
* Signals that this link has been detached
*/
- virtual void detached() = 0;
+ virtual void detached(bool closed) = 0;
/**
* Called when a delivery is writable
*/
@@ -98,7 +98,7 @@ class OutgoingFromQueue : public Outgoin
void write(const char* data, size_t size);
void handle(pn_delivery_t* delivery);
bool canDeliver();
- void detached();
+ void detached(bool closed);
//Consumer interface:
bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp Wed Dec 17 14:29:44 2014
@@ -163,7 +163,7 @@ void OutgoingFromRelay::handle(pn_delive
/**
* Signals that this link has been detached
*/
-void OutgoingFromRelay::detached()
+void OutgoingFromRelay::detached(bool /*closed*/)
{
relay->detached(this);
}
@@ -221,7 +221,7 @@ uint32_t IncomingToRelay::getCredit()
return relay->getCredit();
}
-void IncomingToRelay::detached()
+void IncomingToRelay::detached(bool /*closed*/)
{
relay->detached(this);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h Wed Dec 17 14:29:44 2014
@@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoin
const std::string& target, const std::string& name,
boost::shared_ptr<Relay>);
bool doWork();
void handle(pn_delivery_t* delivery);
- void detached();
+ void detached(bool closed);
void init();
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
@@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming
bool settle();
bool doWork();
bool haveWork();
- void detached();
+ void detached(bool closed);
void readable(pn_delivery_t* delivery);
uint32_t getCredit();
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Dec 17 14:29:44
2014
@@ -577,7 +577,7 @@ void Session::detach(pn_link_t* link)
if (pn_link_is_sender(link)) {
OutgoingLinks::iterator i = outgoing.find(link);
if (i != outgoing.end()) {
- i->second->detached();
+ i->second->detached(true/*TODO: checked whether actually closed;
see PROTON-773*/);
boost::shared_ptr<Queue> q =
OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
if (q && !q->isAutoDelete() && !q->isDeleted()) {
connection.getBroker().deleteQueue(q->getName(),
connection.getUserId(), connection.getMgmtId());
@@ -588,7 +588,7 @@ void Session::detach(pn_link_t* link)
} else {
IncomingLinks::iterator i = incoming.find(link);
if (i != incoming.end()) {
- i->second->detached();
+ i->second->detached(true/*TODO: checked whether actually closed;
see PROTON-773*/);
incoming.erase(i);
QPID_LOG(debug, "Incoming link detached");
}
@@ -653,7 +653,7 @@ bool Session::dispatch()
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(s->first);
- s->second->detached();
+ s->second->detached(true);
outgoing.erase(s++);
output = true;
}
@@ -678,7 +678,7 @@ bool Session::dispatch()
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(i->first);
- i->second->detached();
+ i->second->detached(true);
incoming.erase(i++);
output = true;
}
@@ -690,10 +690,10 @@ bool Session::dispatch()
void Session::close()
{
for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end();
++i) {
- i->second->detached();
+ i->second->detached(false);
}
for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end();
++i) {
- i->second->detached();
+ i->second->detached(false);
}
outgoing.clear();
incoming.clear();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]