Author: gsim Date: Sat Jul 13 09:15:49 2013 New Revision: 1502766 URL: http://svn.apache.org/r1502766 Log: QPID-3247: add policy for self-struct subscription queue
Added: qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.h Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1502766&r1=1502765&r2=1502766&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original) +++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Sat Jul 13 09:15:49 2013 @@ -1311,6 +1311,7 @@ set (qpidbroker_SOURCES qpid/broker/SelectorToken.cpp qpid/broker/SelectorValue.h qpid/broker/SelectorValue.cpp + qpid/broker/SelfDestructQueue.cpp qpid/broker/SemanticState.h qpid/broker/SemanticState.cpp qpid/broker/SessionAdapter.cpp Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1502766&r1=1502765&r2=1502766&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Sat Jul 13 09:15:49 2013 @@ -748,6 +748,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SelectorToken.h \ qpid/broker/SelectorValue.cpp \ qpid/broker/SelectorValue.h \ + qpid/broker/SelfDestructQueue.h \ + qpid/broker/SelfDestructQueue.cpp \ qpid/broker/SemanticState.cpp \ qpid/broker/SemanticState.h \ qpid/broker/SessionAdapter.cpp \ Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1502766&r1=1502765&r2=1502766&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp Sat Jul 13 09:15:49 2013 @@ -33,6 +33,7 @@ #include "qpid/broker/PagedQueue.h" #include "qpid/broker/PriorityQueue.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/SelfDestructQueue.h" #include "qpid/broker/ThresholdAlerts.h" #include "qpid/broker/FifoDistributor.h" #include "qpid/log/Statement.h" @@ -53,6 +54,8 @@ boost::shared_ptr<Queue> QueueFactory::c boost::shared_ptr<Queue> queue; if (settings.dropMessagesAtLimit) { queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + } else if (settings.selfDestructAtLimit) { + queue = boost::shared_ptr<Queue>(new SelfDestructQueue(name, settings, settings.durable ? store : 0, parent, broker)); } else if (settings.lvqKey.size()) { std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey)); queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker)); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1502766&r1=1502765&r2=1502766&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Sat Jul 13 09:15:49 2013 @@ -38,6 +38,7 @@ const std::string MAX_FILE_SIZE("qpid.fi const std::string POLICY_TYPE("qpid.policy_type"); const std::string POLICY_TYPE_REJECT("reject"); const std::string POLICY_TYPE_RING("ring"); +const std::string POLICY_TYPE_SELF_DESTRUCT("self-destruct"); const std::string NO_LOCAL("no-local"); const std::string BROWSE_ONLY("qpid.browse-only"); const std::string TRACE_ID("qpid.trace.id"); @@ -96,6 +97,7 @@ QueueSettings::QueueSettings(bool d, boo shareGroups(false), addTimestamp(false), dropMessagesAtLimit(false), + selfDestructAtLimit(false), paging(false), maxPages(0), pageFactor(0), @@ -120,6 +122,9 @@ bool QueueSettings::handle(const std::st if (value.getString() == POLICY_TYPE_RING) { dropMessagesAtLimit = true; return true; + } else if (value.getString() == POLICY_TYPE_SELF_DESTRUCT) { + selfDestructAtLimit = true; + return true; } else if (value.getString() == POLICY_TYPE_REJECT) { //do nothing, thats the default return true; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1502766&r1=1502765&r2=1502766&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h Sat Jul 13 09:15:49 2013 @@ -70,6 +70,7 @@ struct QueueSettings QueueDepth maxDepth; bool dropMessagesAtLimit;//aka ring queue policy + bool selfDestructAtLimit; //PagedQueue: bool paging; Added: qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp?rev=1502766&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp Sat Jul 13 09:15:49 2013 @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SelfDestructQueue.h" +#include "AclModule.h" +#include "Broker.h" +#include "QueueDepth.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { +SelfDestructQueue::SelfDestructQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) : Queue(n, s, ms, p, b) +{ + QPID_LOG_CAT(debug, model, "Self-destruct queue created: " << name); +} +bool SelfDestructQueue::checkDepth(const QueueDepth& increment, const Message&) +{ + if (settings.maxDepth && (settings.maxDepth - current < increment)) { + broker->getQueues().destroy(name); + if (broker->getAcl()) + broker->getAcl()->recordDestroyQueue(name); + QPID_LOG_CAT(debug, model, "Queue " << name << " deleted itself due to reaching limit: " << current << " (policy is " << settings.maxDepth << ")"); + destroyed(); + } + current += increment; + return true; +} +}} // namespace qpid::broker Added: qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.h?rev=1502766&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SelfDestructQueue.h Sat Jul 13 09:15:49 2013 @@ -0,0 +1,45 @@ +#ifndef QPID_BROKER_SELFDESTRUCTQUEUE_H +#define QPID_BROKER_SELFDESTRUCTQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Queue.h" + +namespace qpid { +namespace broker { + +/** + * Deletes itself when breaching specified maximum depth (useful as + * subscription queue for consumers that should be ejecetd from topic + * when they can't keep up). + */ +class SelfDestructQueue : public Queue +{ + public: + public: + SelfDestructQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); + bool checkDepth(const QueueDepth& increment, const Message&); + private: + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SELFDESTRUCTQUEUE_H*/ Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1502766&r1=1502765&r2=1502766&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Sat Jul 13 09:15:49 2013 @@ -1289,6 +1289,32 @@ QPID_AUTO_TEST_CASE(testSimpleRequestRes BOOST_CHECK_EQUAL(m.getSubject(), original.getSubject()); } +QPID_AUTO_TEST_CASE(testSelfDestructQueue) +{ + MessagingFixture fix; + //create receiver on temp queue for responses (using shorthand for temp queue) + Session other = fix.connection.createSession(); + Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}"); + Receiver r2 = fix.session.createReceiver("amq.fanout"); + //send request + Sender s = fix.session.createSender("amq.fanout"); + for (uint i = 0; i < 20; ++i) { + s.send(Message((boost::format("MSG_%1%") % (i+1)).str())); + } + try { + ScopedSuppressLogging sl; + for (uint i = 0; i < 20; ++i) { + r1.fetch(Duration::SECOND); + } + BOOST_FAIL("Expected exception."); + } catch (const qpid::messaging::MessagingException&) { + } + + for (uint i = 0; i < 20; ++i) { + BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); + } +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org