Author: gsim
Date: Fri Jun 22 13:18:05 2012
New Revision: 1352874
URL: http://svn.apache.org/viewvc?rev=1352874&view=rev
Log:
QPID-4075: Raise delete event for autodeleted queues also
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jun 22 13:18:05 2012
@@ -49,6 +49,7 @@
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <iostream>
#include <algorithm>
@@ -1484,12 +1485,15 @@ boost::shared_ptr<Exchange> Queue::getAl
return alternateExchange;
}
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const
std::string& connectionId, const std::string& userId)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
+
+ if (broker.getManagementAgent())
+
broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId,
userId, queue->getName()));
}
}
@@ -1497,9 +1501,11 @@ struct AutoDeleteTask : qpid::sys::Timer
{
Broker& broker;
Queue::shared_ptr queue;
+ std::string connectionId;
+ std::string userId;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()),
broker(b), queue(q) {}
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId,
const std::string& uId, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()),
broker(b), queue(q), connectionId(cId), userId(uId) {}
void fire()
{
@@ -1507,19 +1513,19 @@ struct AutoDeleteTask : qpid::sys::Timer
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
};
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const
std::string& connectionId, const std::string& userId)
{
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
- queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new
AutoDeleteTask(broker, queue, time));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new
AutoDeleteTask(broker, queue, connectionId, userId, time));
broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << "
initiated");
} else {
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Jun 22 13:18:05 2012
@@ -344,7 +344,7 @@ class Queue : public boost::enable_share
* exclusive owner
*/
static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer&
buffer);
- static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+ static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const
std::string& connectionId, const std::string& userId);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Jun 22 13:18:05
2012
@@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAda
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth &&
!getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
- closeComplete(false)
+ closeComplete(false),
+ connectionId(getSession().getConnection().getUrl())
{}
SemanticState::~SemanticState() {
@@ -428,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl:
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(session.getBroker(), queue);
+ Queue::tryAutoDelete(session.getBroker(), queue, connectionId,
userID);
}
}
c->cancel();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Jun 22 13:18:05 2012
@@ -182,6 +182,8 @@ class SemanticState : private boost::non
const bool authMsg;
const std::string userID;
bool closeComplete;
+ //needed for queue delete events in auto-delete:
+ const std::string connectionId;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Jun 22 13:18:05
2012
@@ -206,7 +206,10 @@ ExchangeBoundResult SessionAdapter::Exch
}
}
-SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) :
HandlerHelper(session), broker(getBroker())
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
+ : HandlerHelper(session), broker(getBroker()),
+ //record connection id and userid for deleting exclsuive queues after
session has ended:
+ connectionId(getConnection().getUrl()),
userId(getConnection().getUserId())
{}
@@ -225,7 +228,7 @@ void SessionAdapter::QueueHandlerImpl::d
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q);
+ Queue::tryAutoDelete(broker, q, connectionId, userId);
}
exclusiveQueues.erase(exclusiveQueues.begin());
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Fri Jun 22 13:18:05
2012
@@ -121,6 +121,9 @@ class Queue;
{
Broker& broker;
std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+ //connectionId and userId are needed for queue-delete events for auto
deleted, exclusive queues
+ std::string connectionId;
+ std::string userId;
public:
QueueHandlerImpl(SemanticState& session);
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Fri Jun 22 13:18:05 2012
@@ -68,6 +68,7 @@ struct Options : public qpid::Options
bool reportHeader;
string readyAddress;
uint receiveRate;
+ std::string replyto;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -114,6 +115,7 @@ struct Options : public qpid::Options
("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers
on report.")
("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a
message to this address when ready to receive")
("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate
of N messages/second. 0 means receive as fast as possible.")
+ ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify
reply-to address on response messages")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -246,6 +248,9 @@ int main(int argc, char ** argv)
s = session.createSender(msg.getReplyTo());
s.setCapacity(opts.capacity);
}
+ if (!opts.replyto.empty()) {
+ msg.setReplyTo(Address(opts.replyto));
+ }
s.send(msg);
}
if (opts.receiveRate) {
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1352874&r1=1352873&r2=1352874&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Fri Jun 22
13:18:05 2012
@@ -36,3 +36,4 @@ from extensions import *
from msg_groups import *
from new_api import *
from stats import *
+from qmf_events import *
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]