# Timed auto-deletion for queues

## Status

Draft

## Summary

Allow auto-deleted queues to have a configurable delay before they are
actually deleted.

## Problem

Applications often need to dynamically create queues (e.g. reply
queues or subscription queues for pub-sub). Cleaning up these queues
when unused is important. Though applications could delete them, it is
safest to allow the broker to do so in the event of application
failure for example. However, it is also often desirable that the
broker does not immediately delete queues when they become eligible
for deletion (based on the AMQP defined rules for auto-delete). By
allowing a configurable delay, applications can failover -
i.e. reconnect if their connection is lost - and resume their use of a
particular queue. However if they do not do so within the configured
time, the broker can still protect itself and clean the queue up.

## Solution

Recognise a timeout value that can be passed in the arguments to
queue-declare. Instead of immediately deleting the queue when eligible
for doing so, set up a timed task to try to delete after the
configured delay. If the queue becomes used within that period again,
the delayed deletion will be cancelled.

## Rationale

Seemed like the simplest solution.

## Implementation Notes

Covered by QPID-3000.

I have a patch that implements this feature in the c++ broker
(attached). 

The implementations of the messaging client API should also be altered
to use this feature for reliable subscriptions. It will be directly
exposed already through the x-declare addressing option.

## Consequences

Should not affect the development (though the auto-deletion logic
could do with some refactoring), nor the release artefacts. The
feature will need to be documented (and tested from different
clients). 

It is configured via an argument in the queue-declare; the default
will be no delay giving the same behaviour as currently supported
meaning there will be no issues for backwards compatibility at the
broker.

The AMQP 0-10 specification states that for queues set to be
auto-deleted "the server will delete the message queue when all
clients have finished using it, or shortly thereafter." One view is
that this change would allow the definition of 'shortly after' to be
configurable; another is that allows strict observance of the
auto-delete proprty to be relaxed.

## Contributor-in-Charge

[email protected]

## Contributors

TBD

## Version

1.0
commit 9ed83b2562a919729dadbf7324afc12050464063
Author: Gordon Sim <gordon@GRST500.(none)>
Date:   Wed Jan 12 11:36:21 2011 +0000

    BZ585844: timed auto-delete

diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3f2a74f..f910d57 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -72,6 +72,7 @@ const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
 const std::string qpidPersistLastNode("qpid.persist_last_node");
 const std::string qpidVQMatchProperty("qpid.LVQ_key");
 const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
 //following feature is not ready for general use as it doesn't handle
 //the case where a message is enqueued on more than one queue well enough:
 const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -106,7 +107,8 @@ Queue::Queue(const string& _name, bool _autodelete,
     broker(b),
     deleted(false),
     barrier(*this),
-    prioritySlots(1)
+    prioritySlots(1),
+    autoDeleteTimeout(0)
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -508,6 +510,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     consumerCount++;
     if (mgmtObject != 0)
         mgmtObject->inc_consumerCount ();
+    //reset auto deletion timer if necessary
+    if (autoDeleteTimeout && autoDeleteTask) {
+        autoDeleteTask->cancel();
+    }
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
@@ -781,7 +787,7 @@ uint32_t Queue::getConsumerCount() const
 bool Queue::canAutoDelete() const
 {
     Mutex::ScopedLock locker(consumerLock);
-    return autodelete && !consumerCount;
+    return autodelete && !consumerCount && !owner;
 }
 
 void Queue::clearLastNodeFailure()
@@ -1018,6 +1024,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
+    autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+    if (autoDeleteTimeout) 
+        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); 
+
     if (mgmtObject != 0)
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
 
@@ -1148,15 +1158,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
     return alternateExchange;
 }
 
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
 {
     if (broker.getQueues().destroyIf(queue->getName(), 
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+        QPID_LOG(debug, "Auto-deleting " << queue->getName());
         queue->unbind(broker.getExchanges(), queue);
         queue->destroy();
     }
 }
 
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+    Broker& broker;
+    Queue::shared_ptr queue;
+
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) 
+        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+    void fire()
+    {
+        //need to detect case where queue was used after the task was
+        //created, but then became unused again before the task fired;
+        //in this case ignore this request as there will have already
+        //been a later task added
+        tryAutoDeleteImpl(broker, queue);
+    }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+    if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+        AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+        queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+        broker.getClusterTimer().add(queue->autoDeleteTask);        
+        QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+    } else {
+        tryAutoDeleteImpl(broker, queue);
+    }
+}
+
 bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
 { 
     Mutex::ScopedLock locker(ownershipLock);
@@ -1171,6 +1212,10 @@ void Queue::releaseExclusiveOwnership()
 
 bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
 { 
+    //reset auto deletion timer if necessary
+    if (autoDeleteTimeout && autoDeleteTask) {
+        autoDeleteTask->cancel();
+    }
     Mutex::ScopedLock locker(ownershipLock);
     if (owner) {
         return false;
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 462c8f5..d5b4b3f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -34,6 +34,7 @@
 
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
 #include "qpid/framing/amqp_types.h"
@@ -152,6 +153,8 @@ namespace qpid {
             UsageBarrier barrier;
             int prioritySlots;
             boost::shared_ptr<Fairshare> fairshare;
+            int autoDeleteTimeout;
+            boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
 
             void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to