Author: gsim
Date: Wed Dec 17 14:29:44 2014
New Revision: 1646260

URL: http://svn.apache.org/r1646260
Log:
QPID-6274: Delete subscription queue immediately on link close

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Dec 17 14:29:44 2014
@@ -1300,10 +1300,10 @@ struct AutoDeleteTask : qpid::sys::Timer
     }
 };
 
-void Queue::scheduleAutoDelete()
+void Queue::scheduleAutoDelete(bool immediate)
 {
     if (canAutoDelete()) {
-        if (settings.autoDeleteDelay) {
+        if (!immediate && settings.autoDeleteDelay) {
             AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC));
             autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new 
AutoDeleteTask(shared_from_this(), time));
             broker->getTimer().add(autoDeleteTask);
@@ -1343,7 +1343,7 @@ bool Queue::isExclusiveOwner(const Owner
     return o == owner;
 }
 
-void Queue::releaseExclusiveOwnership()
+void Queue::releaseExclusiveOwnership(bool immediateExpiry)
 {
     bool unused;
     {
@@ -1355,7 +1355,7 @@ void Queue::releaseExclusiveOwnership()
         unused = !users.isUsed();
     }
     if (unused && settings.autodelete) {
-        scheduleAutoDelete();
+        scheduleAutoDelete(immediateExpiry);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Dec 17 14:29:44 2014
@@ -379,7 +379,7 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
     inline const std::string& getName() const { return name; }
     QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) 
const;
-    QPID_BROKER_EXTERN void releaseExclusiveOwnership();
+    QPID_BROKER_EXTERN void releaseExclusiveOwnership(bool 
immediateExpiry=false);
     QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
     QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
     QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
@@ -389,7 +389,7 @@ class Queue : public boost::enable_share
     inline bool isAutoDelete() const { return settings.autodelete; }
     inline bool isBrowseOnly() const { return settings.isBrowseOnly; }
     QPID_BROKER_EXTERN bool canAutoDelete() const;
-    QPID_BROKER_EXTERN void scheduleAutoDelete();
+    QPID_BROKER_EXTERN void scheduleAutoDelete(bool immediate=false);
     QPID_BROKER_EXTERN bool isDeleted() const;
     const QueueBindings& getBindings() const { return bindings; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Wed Dec 17 14:29:44 
2014
@@ -57,7 +57,7 @@ uint32_t Incoming::getCredit()
     return credit;//TODO: proper flow control
 }
 
-void Incoming::detached()
+void Incoming::detached(bool /*closed*/)
 {
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h Wed Dec 17 14:29:44 2014
@@ -42,7 +42,7 @@ class Incoming : public ManagedIncomingL
     virtual ~Incoming();
     virtual bool doWork();//do anything that requires output
     virtual bool haveWork();//called when handling input to see whether any 
output work is needed
-    virtual void detached();
+    virtual void detached(bool closed);
     virtual void readable(pn_delivery_t* delivery) = 0;
     void verify(const std::string& userid, const std::string& defaultRealm);
     void wakeup();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Dec 17 14:29:44 
2014
@@ -156,7 +156,7 @@ bool OutgoingFromQueue::canDeliver()
     return deliveries[current].delivery == 0 && pn_link_credit(link);
 }
 
-void OutgoingFromQueue::detached()
+void OutgoingFromQueue::detached(bool closed)
 {
     QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << 
queue->getName());
     queue->cancel(shared_from_this());
@@ -164,12 +164,14 @@ void OutgoingFromQueue::detached()
     for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
         if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
     }
-    if (exclusive) queue->releaseExclusiveOwnership();
-    else if (isControllingUser) queue->releaseFromUse(true);
+    if (exclusive) {
+        queue->releaseExclusiveOwnership(closed);
+    } else if (isControllingUser) {
+        queue->releaseFromUse(true);
+    }
     cancelled = true;
 }
 
-
 OutgoingFromQueue::~OutgoingFromQueue()
 {
     if (!cancelled && isControllingUser) queue->releaseFromUse(true);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Dec 17 14:29:44 2014
@@ -70,7 +70,7 @@ class Outgoing : public ManagedOutgoingL
     /**
      * Signals that this link has been detached
      */
-    virtual void detached() = 0;
+    virtual void detached(bool closed) = 0;
     /**
      * Called when a delivery is writable
      */
@@ -98,7 +98,7 @@ class OutgoingFromQueue : public Outgoin
     void write(const char* data, size_t size);
     void handle(pn_delivery_t* delivery);
     bool canDeliver();
-    void detached();
+    void detached(bool closed);
 
     //Consumer interface:
     bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp Wed Dec 17 14:29:44 2014
@@ -163,7 +163,7 @@ void OutgoingFromRelay::handle(pn_delive
 /**
  * Signals that this link has been detached
  */
-void OutgoingFromRelay::detached()
+void OutgoingFromRelay::detached(bool /*closed*/)
 {
     relay->detached(this);
 }
@@ -221,7 +221,7 @@ uint32_t IncomingToRelay::getCredit()
     return relay->getCredit();
 }
 
-void IncomingToRelay::detached()
+void IncomingToRelay::detached(bool /*closed*/)
 {
     relay->detached(this);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.h Wed Dec 17 14:29:44 2014
@@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoin
                       const std::string& target, const std::string& name, 
boost::shared_ptr<Relay>);
     bool doWork();
     void handle(pn_delivery_t* delivery);
-    void detached();
+    void detached(bool closed);
     void init();
     void setSubjectFilter(const std::string&);
     void setSelectorFilter(const std::string&);
@@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming
     bool settle();
     bool doWork();
     bool haveWork();
-    void detached();
+    void detached(bool closed);
     void readable(pn_delivery_t* delivery);
     uint32_t getCredit();
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1646260&r1=1646259&r2=1646260&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Dec 17 14:29:44 
2014
@@ -577,7 +577,7 @@ void Session::detach(pn_link_t* link)
     if (pn_link_is_sender(link)) {
         OutgoingLinks::iterator i = outgoing.find(link);
         if (i != outgoing.end()) {
-            i->second->detached();
+            i->second->detached(true/*TODO: checked whether actually closed; 
see PROTON-773*/);
             boost::shared_ptr<Queue> q = 
OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
             if (q && !q->isAutoDelete() && !q->isDeleted()) {
                 connection.getBroker().deleteQueue(q->getName(), 
connection.getUserId(), connection.getMgmtId());
@@ -588,7 +588,7 @@ void Session::detach(pn_link_t* link)
     } else {
         IncomingLinks::iterator i = incoming.find(link);
         if (i != incoming.end()) {
-            i->second->detached();
+            i->second->detached(true/*TODO: checked whether actually closed; 
see PROTON-773*/);
             incoming.erase(i);
             QPID_LOG(debug, "Incoming link detached");
         }
@@ -653,7 +653,7 @@ bool Session::dispatch()
             pn_condition_set_name(error, e.symbol());
             pn_condition_set_description(error, e.what());
             pn_link_close(s->first);
-            s->second->detached();
+            s->second->detached(true);
             outgoing.erase(s++);
             output = true;
         }
@@ -678,7 +678,7 @@ bool Session::dispatch()
             pn_condition_set_name(error, e.symbol());
             pn_condition_set_description(error, e.what());
             pn_link_close(i->first);
-            i->second->detached();
+            i->second->detached(true);
             incoming.erase(i++);
             output = true;
         }
@@ -690,10 +690,10 @@ bool Session::dispatch()
 void Session::close()
 {
     for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); 
++i) {
-        i->second->detached();
+        i->second->detached(false);
     }
     for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); 
++i) {
-        i->second->detached();
+        i->second->detached(false);
     }
     outgoing.clear();
     incoming.clear();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to