Author: gsim
Date: Fri Jun 22 13:18:05 2012
New Revision: 1352874

URL: http://svn.apache.org/viewvc?rev=1352874&view=rev
Log:
QPID-4075: Raise delete event for autodeleted queues also

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jun 22 13:18:05 2012
@@ -49,6 +49,7 @@
 #include "qpid/types/Variant.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 
 #include <iostream>
 #include <algorithm>
@@ -1484,12 +1485,15 @@ boost::shared_ptr<Exchange> Queue::getAl
     return alternateExchange;
 }
 
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const 
std::string& connectionId, const std::string& userId)
 {
     if (broker.getQueues().destroyIf(queue->getName(),
                                      
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
         queue->destroyed();
+
+        if (broker.getManagementAgent())
+            
broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, 
userId, queue->getName()));
     }
 }
 
@@ -1497,9 +1501,11 @@ struct AutoDeleteTask : qpid::sys::Timer
 {
     Broker& broker;
     Queue::shared_ptr queue;
+    std::string connectionId;
+    std::string userId;
 
-    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
-        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), 
broker(b), queue(q) {}
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, 
const std::string& uId, AbsTime fireTime)
+        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), 
broker(b), queue(q), connectionId(cId), userId(uId) {}
 
     void fire()
     {
@@ -1507,19 +1513,19 @@ struct AutoDeleteTask : qpid::sys::Timer
         //created, but then became unused again before the task fired;
         //in this case ignore this request as there will have already
         //been a later task added
-        tryAutoDeleteImpl(broker, queue);
+        tryAutoDeleteImpl(broker, queue, connectionId, userId);
     }
 };
 
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const 
std::string& connectionId, const std::string& userId)
 {
     if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
         AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
-        queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new 
AutoDeleteTask(broker, queue, time));
+        queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new 
AutoDeleteTask(broker, queue, connectionId, userId, time));
         broker.getClusterTimer().add(queue->autoDeleteTask);
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " 
initiated");
     } else {
-        tryAutoDeleteImpl(broker, queue);
+        tryAutoDeleteImpl(broker, queue, connectionId, userId);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Jun 22 13:18:05 2012
@@ -344,7 +344,7 @@ class Queue : public boost::enable_share
      * exclusive owner
      */
     static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& 
buffer);
-    static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+    static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const 
std::string& connectionId, const std::string& userId);
 
     virtual void setExternalQueueStore(ExternalQueueStore* inst);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Jun 22 13:18:05 
2012
@@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAda
       dtxSelected(false),
       authMsg(getSession().getBroker().getOptions().auth && 
!getSession().getConnection().isUserProxyAuth()),
       userID(getSession().getConnection().getUserId()),
-      closeComplete(false)
+      closeComplete(false),
+      connectionId(getSession().getConnection().getUrl())
 {}
 
 SemanticState::~SemanticState() {
@@ -428,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl:
     if(queue) {
         queue->cancel(c);
         if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
-            Queue::tryAutoDelete(session.getBroker(), queue);
+            Queue::tryAutoDelete(session.getBroker(), queue, connectionId, 
userID);
         }
     }
     c->cancel();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Jun 22 13:18:05 2012
@@ -182,6 +182,8 @@ class SemanticState : private boost::non
     const bool authMsg;
     const std::string userID;
     bool closeComplete;
+    //needed for queue delete events in auto-delete:
+    const std::string connectionId;
 
     void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
     void checkDtxTimeout();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Jun 22 13:18:05 
2012
@@ -206,7 +206,10 @@ ExchangeBoundResult SessionAdapter::Exch
     }
 }
 
-SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : 
HandlerHelper(session), broker(getBroker())
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
+    : HandlerHelper(session), broker(getBroker()),
+      //record connection id and userid for deleting exclsuive queues after 
session has ended:
+      connectionId(getConnection().getUrl()), 
userId(getConnection().getUserId())
 {}
 
 
@@ -225,7 +228,7 @@ void SessionAdapter::QueueHandlerImpl::d
         Queue::shared_ptr q(exclusiveQueues.front());
         q->releaseExclusiveOwnership();
         if (q->canAutoDelete()) {
-            Queue::tryAutoDelete(broker, q);
+            Queue::tryAutoDelete(broker, q, connectionId, userId);
         }
         exclusiveQueues.erase(exclusiveQueues.begin());
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Fri Jun 22 13:18:05 
2012
@@ -121,6 +121,9 @@ class Queue;
     {
         Broker& broker;
         std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+        //connectionId and userId are needed for queue-delete events for auto 
deleted, exclusive queues
+        std::string connectionId;
+        std::string userId;
 
       public:
         QueueHandlerImpl(SemanticState& session);

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Fri Jun 22 13:18:05 2012
@@ -68,6 +68,7 @@ struct Options : public qpid::Options
     bool reportHeader;
     string readyAddress;
     uint receiveRate;
+    std::string replyto;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -114,6 +115,7 @@ struct Options : public qpid::Options
             ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers 
on report.")
             ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a 
message to this address when ready to receive")
             ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate 
of N messages/second. 0 means receive as fast as possible.")
+            ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify 
reply-to address on response messages")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -246,6 +248,9 @@ int main(int argc, char ** argv)
                         s = session.createSender(msg.getReplyTo());
                         s.setCapacity(opts.capacity);
                     }
+                    if (!opts.replyto.empty()) {
+                        msg.setReplyTo(Address(opts.replyto));
+                    }
                     s.send(msg);
                 }
                 if (opts.receiveRate) {

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Fri Jun 22 
13:18:05 2012
@@ -36,3 +36,4 @@ from extensions import *
 from msg_groups import *
 from new_api import *
 from stats import *
+from qmf_events import *



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to