Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Fri Aug 10 12:04:27 2012 @@ -68,7 +68,7 @@ bool ReplicationTest::isReplicated( bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q) { - return isReplicated(level, q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner()); + return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner()); }
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Aug 10 12:04:27 2012 @@ -31,6 +31,7 @@ #include <qpid/broker/Message.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/sys/Time.h" #include "qpid/sys/Thread.h" #include "qpid/broker/ConnectionState.h" @@ -535,7 +536,7 @@ void ManagementAgent::sendBufferLH(Buffe } if (exchange.get() == 0) return; - intrusive_ptr<Message> msg(new Message()); + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); @@ -547,24 +548,26 @@ void ManagementAgent::sendBufferLH(Buffe header.setEof(false); content.setBof(false); - msg->getFrames().append(method); - msg->getFrames().append(header); + transfer->getFrames().append(method); + transfer->getFrames().append(header); MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); + transfer->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(length); DeliveryProperties* dp = - msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + transfer->getFrames().getHeaders()->get<DeliveryProperties>(true); dp->setRoutingKey(routingKey); - msg->getFrames().append(content); - msg->setIsManagementMessage(true); + transfer->getFrames().append(content); + + Message msg(transfer, transfer); + msg.setIsManagementMessage(true); { sys::Mutex::ScopedUnlock u(userLock); - DeliverableMessage deliverable (msg); + DeliverableMessage deliverable (msg, 0); try { exchange->route(deliverable); } catch(exception&) {} @@ -602,7 +605,7 @@ void ManagementAgent::sendBufferLH(const } if (exchange.get() == 0) return; - intrusive_ptr<Message> msg(new Message()); + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody(data))); @@ -612,11 +615,11 @@ void ManagementAgent::sendBufferLH(const header.setEof(false); content.setBof(false); - msg->getFrames().append(method); - msg->getFrames().append(header); + transfer->getFrames().append(method); + transfer->getFrames().append(header); MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); + transfer->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(data.length()); if (!cid.empty()) { props->setCorrelationId(cid); @@ -625,23 +628,24 @@ void ManagementAgent::sendBufferLH(const props->setAppId("qmf2"); for (i = headers.begin(); i != headers.end(); ++i) { - msg->insertCustomProperty(i->first, i->second.asString()); + props->getApplicationHeaders().setString(i->first, i->second.asString()); } DeliveryProperties* dp = - msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + transfer->getFrames().getHeaders()->get<DeliveryProperties>(true); dp->setRoutingKey(routingKey); if (ttl_msec) { dp->setTtl(ttl_msec); - msg->computeExpiration(broker->getExpiryPolicy()); } - msg->getFrames().append(content); - msg->setIsManagementMessage(true); + transfer->getFrames().append(content); + Message msg(transfer, transfer); + msg.setIsManagementMessage(true); + msg.computeExpiration(broker->getExpiryPolicy()); { sys::Mutex::ScopedUnlock u(userLock); - DeliverableMessage deliverable (msg); + DeliverableMessage deliverable (msg, 0); try { exchange->route(deliverable); } catch(exception&) {} @@ -2135,19 +2139,20 @@ bool ManagementAgent::authorizeAgentMess // authorized or not. In this case, return true (authorized) if there is no ACL in place, // otherwise return false; // - if (msg.encodedSize() > MA_BUFFER_SIZE) + if (msg.getContentSize() > MA_BUFFER_SIZE) return broker->getAcl() == 0; - msg.encodeContent(inBuffer); + inBuffer.putRawData(msg.getContent()); uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); - const framing::FieldTable *headers = msg.getApplicationHeaders(); + const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; - if (headers && msg.getAppId() == "qmf2") + if (headers && p->getAppId() == "qmf2") { mapMsg = true; @@ -2238,8 +2243,9 @@ bool ManagementAgent::authorizeAgentMess // authorization failed, send reply if replyTo present + qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); string rte = rt.getExchange(); @@ -2277,8 +2283,9 @@ void ManagementAgent::dispatchAgentComma { string rte; string rtk; + qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); rte = rt.getExchange(); @@ -2290,19 +2297,19 @@ void ManagementAgent::dispatchAgentComma Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; - if (msg.encodedSize() > MA_BUFFER_SIZE) { + if (msg.getContentSize() > MA_BUFFER_SIZE) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << - msg.encodedSize()); + msg.getContentSize()); return; } - msg.encodeContent(inBuffer); + inBuffer.putRawData(msg.getContent()); uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher()); - const framing::FieldTable *headers = msg.getApplicationHeaders(); - if (headers && msg.getAppId() == "qmf2") + const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; + if (headers && p->getAppId() == "qmf2") { string opcode = headers->getAsString("qmf.opcode"); string contentType = headers->getAsString("qmf.content"); Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp Fri Aug 10 12:04:27 2012 @@ -43,11 +43,9 @@ ManagementDirectExchange::ManagementDire void ManagementDirectExchange::route(Deliverable& msg) { bool routeIt = true; - const std::string& routingKey = msg.getMessage().getRoutingKey(); - const FieldTable* args = msg.getMessage().getApplicationHeaders(); if (managementAgent) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion); + routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion); if (routeIt) DirectExchange::route(msg); Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp Fri Aug 10 12:04:27 2012 @@ -42,12 +42,10 @@ ManagementTopicExchange::ManagementTopic void ManagementTopicExchange::route(Deliverable& msg) { bool routeIt = true; - const std::string& routingKey = msg.getMessage().getRoutingKey(); - const FieldTable* args = msg.getMessage().getApplicationHeaders(); // Intercept management agent commands if (managementAgent) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion); + routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion); if (routeIt) TopicExchange::route(msg); Modified: qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Fri Aug 10 12:04:27 2012 @@ -249,7 +249,7 @@ MessageStorePlugin::destroy(const broker void MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg) { - if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) { + if (msg->getPersistenceId() == 0) { provider->second->stage(msg); } } Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Fri Aug 10 12:04:27 2012 @@ -27,6 +27,7 @@ #include "qpid/log/Statement.h" #include "qpid/broker/FedOps.h" +#include "qpid/broker/MapHandler.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" @@ -198,7 +199,52 @@ bool XmlExchange::unbind(Queue::shared_p } } -bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) +namespace { +class DefineExternals : public MapHandler +{ + public: + DefineExternals(DynamicContext* c) : context(c) { assert(context); } + void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { process(std::string(key.data, key.size), (int) value); } + void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { process(std::string(key.data, key.size), (int) value); } + void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { process(std::string(key.data, key.size), (int) value); } + void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { process(std::string(key.data, key.size), (int) value); } + void handleInt8(const MapHandler::CharSequence& key, int8_t value) { process(std::string(key.data, key.size), (int) value); } + void handleInt16(const MapHandler::CharSequence& key, int16_t value) { process(std::string(key.data, key.size), (int) value); } + void handleInt32(const MapHandler::CharSequence& key, int32_t value) { process(std::string(key.data, key.size), (int) value); } + void handleInt64(const MapHandler::CharSequence& key, int64_t value) { process(std::string(key.data, key.size), (int) value); } + void handleFloat(const MapHandler::CharSequence& key, float value) { process(std::string(key.data, key.size), value); } + void handleDouble(const MapHandler::CharSequence& key, double value) { process(std::string(key.data, key.size), value); } + void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/) + { + process(std::string(key.data, key.size), std::string(value.data, value.size)); + } + void handleVoid(const MapHandler::CharSequence&) {} + private: + void process(const std::string& key, double value) + { + QPID_LOG(trace, "XmlExchange, external variable (double): " << key << " = " << value); + Item::Ptr item = context->getItemFactory()->createDouble(value, context); + context->setExternalVariable(X(key.c_str()), item); + } + void process(const std::string& key, int value) + { + QPID_LOG(trace, "XmlExchange, external variable (int):" << key << " = " << value); + Item::Ptr item = context->getItemFactory()->createInteger(value, context); + context->setExternalVariable(X(key.c_str()), item); + } + void process(const std::string& key, const std::string& value) + { + QPID_LOG(trace, "XmlExchange, external variable (string):" << key << " = " << value); + Item::Ptr item = context->getItemFactory()->createString(X(value.c_str()), context); + context->setExternalVariable(X(key.c_str()), item); + } + + DynamicContext* context; +}; + +} + +bool XmlExchange::matches(Query& query, Deliverable& msg, bool parse_message_content) { std::string msgContent; @@ -212,7 +258,7 @@ bool XmlExchange::matches(Query& query, if (parse_message_content) { - msg.getMessage().getFrames().getContent(msgContent); + msgContent = msg.getMessage().getContent(); QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); @@ -231,28 +277,8 @@ bool XmlExchange::matches(Query& query, } } - if (args) { - FieldTable::ValueMap::const_iterator v = args->begin(); - for(; v != args->end(); ++v) { - - if (v->second->convertsTo<double>()) { - QPID_LOG(trace, "XmlExchange, external variable (double): " << v->first << " = " << v->second->get<double>()); - Item::Ptr value = context->getItemFactory()->createDouble(v->second->get<double>(), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - else if (v->second->convertsTo<int>()) { - QPID_LOG(trace, "XmlExchange, external variable (int):" << v->first << " = " << v->second->getData().getInt()); - Item::Ptr value = context->getItemFactory()->createInteger(v->second->get<int>(), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - else if (v->second->convertsTo<std::string>()) { - QPID_LOG(trace, "XmlExchange, external variable (string):" << v->first << " = " << v->second->getData().getString().c_str()); - Item::Ptr value = context->getItemFactory()->createString(X(v->second->get<std::string>().c_str()), context.get()); - context->setExternalVariable(X(v->first.c_str()), value); - } - - } - } + DefineExternals f(context.get()); + msg.getMessage().processProperties(f); Result result = query->execute(context.get()); #ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP @@ -286,7 +312,6 @@ bool XmlExchange::matches(Query& query, void XmlExchange::route(Deliverable& msg) { const std::string& routingKey = msg.getMessage().getRoutingKey(); - const FieldTable* args = msg.getMessage().getApplicationHeaders(); PreRoute pr(msg, this); try { XmlBinding::vector::ConstPtr p; @@ -298,7 +323,7 @@ void XmlExchange::route(Deliverable& msg } for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { - if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { + if (matches((*i)->xquery, msg, (*i)->parse_message_content)) { b->push_back(*i); } } Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h Fri Aug 10 12:04:27 2012 @@ -65,7 +65,7 @@ class XmlExchange : public virtual Excha qpid::sys::RWlock lock; - bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content); + bool matches(Query& query, Deliverable& msg, bool parse_message_content); public: static const std::string typeName; Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original) +++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Fri Aug 10 12:04:27 2012 @@ -126,6 +126,7 @@ set(unit_tests_to_build ExchangeTest HeadersExchangeTest MessageTest + QueueDepth QueueRegistryTest QueuePolicyTest QueueFlowLimitTest @@ -135,16 +136,12 @@ set(unit_tests_to_build TimerTest TopicExchangeTest TxBufferTest - TxPublishTest - MessageBuilderTest ManagementTest MessageReplayTracker ConsoleTest - QueueEvents ProxyTest RetryList FrameDecoder - ReplicationTest ClientMessageTest PollableCondition Variant Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Aug 10 12:04:27 2012 @@ -621,7 +621,7 @@ QPID_AUTO_TEST_CASE(testQueueDeleted) fix.session.queueDeclare(arg::queue="my-queue"); LocalQueue queue; fix.subs.subscribe(queue, "my-queue"); - + ScopedSuppressLogging sl; fix.session.queueDelete(arg::queue="my-queue"); BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException); Modified: qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Fri Aug 10 12:04:27 2012 @@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort) list<DeliveryRecord> records; for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { - DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); + DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); r.setId(*i); records.push_back(r); } Modified: qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Fri Aug 10 12:04:27 2012 @@ -35,7 +35,6 @@ using std::string; -using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; @@ -62,11 +61,9 @@ QPID_AUTO_TEST_CASE(testMe) queue.reset(); queue2.reset(); - intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id")); - DeliverableMessage msg(msgPtr); + DeliverableMessage msg(MessageUtils::createMessage("exchange", "abc"), 0); topic.route(msg); direct.route(msg); - } QPID_AUTO_TEST_CASE(testIsBound) @@ -170,16 +167,6 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRede BOOST_CHECK_EQUAL(string("direct"), response.first->getType()); } -intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) { - intrusive_ptr<Message> msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - msg->getFrames().append(method); - msg->getFrames().append(header); - msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); - return msg; -} - QPID_AUTO_TEST_CASE(testSequenceOptions) { FieldTable args; @@ -189,46 +176,35 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { DirectExchange direct("direct1", false, args); - intrusive_ptr<Message> msg1 = cmessage("e", "abc"); - intrusive_ptr<Message> msg2 = cmessage("e", "abc"); - intrusive_ptr<Message> msg3 = cmessage("e", "abc"); - - DeliverableMessage dmsg1(msg1); - DeliverableMessage dmsg2(msg2); - DeliverableMessage dmsg3(msg3); - - direct.route(dmsg1); - direct.route(dmsg2); - direct.route(dmsg3); - - BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg3(MessageUtils::createMessage("e", "abc"), 0); + + direct.route(msg1); + direct.route(msg2); + direct.route(msg3); + + BOOST_CHECK_EQUAL(1, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); + BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); + BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); FanOutExchange fanout("fanout1", false, args); HeadersExchange header("headers1", false, args); TopicExchange topic ("topic1", false, args); // check other exchanges, that they preroute - intrusive_ptr<Message> msg4 = cmessage("e", "abc"); - intrusive_ptr<Message> msg5 = cmessage("e", "abc"); - - // Need at least empty header for the HeadersExchange to route at all - msg5->insertCustomProperty("", ""); - intrusive_ptr<Message> msg6 = cmessage("e", "abc"); + DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg5(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg6(MessageUtils::createMessage("e", "abc"), 0); - DeliverableMessage dmsg4(msg4); - DeliverableMessage dmsg5(msg5); - DeliverableMessage dmsg6(msg6); + fanout.route(msg4); + BOOST_CHECK_EQUAL(1, msg4.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); - fanout.route(dmsg4); - BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + header.route(msg5); + BOOST_CHECK_EQUAL(1, msg5.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); - header.route(dmsg5); - BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - - topic.route(dmsg6); - BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + topic.route(msg6); + BOOST_CHECK_EQUAL(1, msg6.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); direct.encode(buffer); } { @@ -237,11 +213,10 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) buffer.reset(); DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer); - intrusive_ptr<Message> msg1 = cmessage("e", "abc"); - DeliverableMessage dmsg1(msg1); - exch_dec->route(dmsg1); + DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0); + exch_dec->route(msg1); - BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + BOOST_CHECK_EQUAL(4, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); } delete [] buff; @@ -256,9 +231,11 @@ QPID_AUTO_TEST_CASE(testIVEOption) HeadersExchange header("headers1", false, args); TopicExchange topic ("topic1", false, args); - intrusive_ptr<Message> msg1 = cmessage("direct1", "abc"); - msg1->insertCustomProperty("a", "abc"); - DeliverableMessage dmsg1(msg1); + qpid::types::Variant::Map properties; + properties["routing-key"] = "abc"; + properties["a"] = "abc"; + Message msg1 = MessageUtils::createMessage(properties, "my-message", "direct1"); + DeliverableMessage dmsg1(msg1, 0); FieldTable args2; args2.setString("x-match", "any"); @@ -273,8 +250,6 @@ QPID_AUTO_TEST_CASE(testIVEOption) Queue::shared_ptr queue2(new Queue("queue2", true)); Queue::shared_ptr queue3(new Queue("queue3", true)); - BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders())); - BOOST_CHECK(direct.bind(queue, "abc", 0)); BOOST_CHECK(fanout.bind(queue1, "abc", 0)); BOOST_CHECK(header.bind(queue2, "", &args2)); @@ -287,7 +262,6 @@ QPID_AUTO_TEST_CASE(testIVEOption) } - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Aug 10 12:04:27 2012 @@ -96,6 +96,7 @@ unit_test_SOURCES= unit_test.cpp unit_te ExchangeTest.cpp \ HeadersExchangeTest.cpp \ MessageTest.cpp \ + QueueDepth.cpp \ QueueRegistryTest.cpp \ QueuePolicyTest.cpp \ QueueFlowLimitTest.cpp \ @@ -105,19 +106,15 @@ unit_test_SOURCES= unit_test.cpp unit_te TimerTest.cpp \ TopicExchangeTest.cpp \ TxBufferTest.cpp \ - TxPublishTest.cpp \ - MessageBuilderTest.cpp \ ConnectionOptions.h \ ForkedBroker.h \ ForkedBroker.cpp \ ManagementTest.cpp \ MessageReplayTracker.cpp \ ConsoleTest.cpp \ - QueueEvents.cpp \ ProxyTest.cpp \ RetryList.cpp \ FrameDecoder.cpp \ - ReplicationTest.cpp \ ClientMessageTest.cpp \ PollableCondition.cpp \ Variant.cpp \ Modified: qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp Fri Aug 10 12:04:27 2012 @@ -24,6 +24,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/Uuid.h" +#include "MessageUtils.h" #include "unit_test.h" @@ -43,49 +44,29 @@ QPID_AUTO_TEST_CASE(testEncodeDecode) { string exchange = "MyExchange"; string routingKey = "MyRoutingKey"; + uint64_t ttl(60); Uuid messageId(true); - string data1("abcdefg"); - string data2("hijklmn"); + string data("abcdefghijklmn"); - boost::intrusive_ptr<Message> msg(new Message()); - - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content1((AMQContentBody(data1))); - AMQFrame content2((AMQContentBody(data2))); - - msg->getFrames().append(method); - msg->getFrames().append(header); - msg->getFrames().append(content1); - msg->getFrames().append(content2); - - MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true); - mProps->setContentLength(data1.size() + data2.size()); - mProps->setMessageId(messageId); - FieldTable applicationHeaders; - applicationHeaders.setString("abc", "xyz"); - mProps->setApplicationHeaders(applicationHeaders); - DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); - dProps->setRoutingKey(routingKey); - dProps->setDeliveryMode(PERSISTENT); - BOOST_CHECK(msg->isPersistent()); - - std::vector<char> buff(msg->encodedSize()); - Buffer wbuffer(&buff[0], msg->encodedSize()); - msg->encode(wbuffer); - - Buffer rbuffer(&buff[0], msg->encodedSize()); - msg = new Message(); - msg->decodeHeader(rbuffer); - msg->decodeContent(rbuffer); - BOOST_CHECK_EQUAL(exchange, msg->getExchangeName()); - BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey()); - BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize()); - BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength()); - BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); - BOOST_CHECK_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("abc")); - BOOST_CHECK_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode()); - BOOST_CHECK(msg->isPersistent()); + qpid::types::Variant::Map properties; + properties["routing-key"] = routingKey; + properties["ttl"] = ttl; + properties["durable"] = true; + properties["message-id"] = qpid::types::Uuid(messageId.data()); + properties["abc"] = "xyz"; + Message msg = MessageUtils::createMessage(properties, data); + + std::string buffer; + encode(msg, buffer); + msg = Message(); + decode(buffer, msg); + + BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey()); + BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize()); + BOOST_CHECK_EQUAL(data, msg.getContent()); + //BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); + BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc")); + BOOST_CHECK(msg.isPersistent()); } QPID_AUTO_TEST_SUITE_END() Modified: qpid/trunk/qpid/cpp/src/tests/MessageUtils.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageUtils.h?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/MessageUtils.h (original) +++ qpid/trunk/qpid/cpp/src/tests/MessageUtils.h Fri Aug 10 12:04:27 2012 @@ -20,9 +20,11 @@ */ #include "qpid/broker/Message.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/Uuid.h" +#include "qpid/types/Variant.h" using namespace qpid; using namespace broker; @@ -33,11 +35,46 @@ namespace tests { struct MessageUtils { - static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="", - const bool durable = false, const Uuid& messageId=Uuid(true), - uint64_t contentSize = 0) + static Message createMessage(const qpid::types::Variant::Map& properties, const std::string& content="", const std::string& destination = "") { - boost::intrusive_ptr<broker::Message> msg(new broker::Message()); + boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer()); + + AMQFrame method(( MessageTransferBody(ProtocolVersion(), destination, 0, 0))); + AMQFrame header((AMQHeaderBody())); + + msg->getFrames().append(method); + msg->getFrames().append(header); + if (content.size()) { + msg->getFrames().getHeaders()->get<MessageProperties>(true)->setContentLength(content.size()); + AMQFrame data((AMQContentBody(content))); + msg->getFrames().append(data); + } + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == "routing-key" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(i->second); + } else if (i->first == "message-id" && !i->second.isVoid()) { + qpid::types::Uuid id = i->second; + qpid::framing::Uuid id2(id.data()); + msg->getFrames().getHeaders()->get<MessageProperties>(true)->setMessageId(id2); + } else if (i->first == "ttl" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(i->second); + } else if (i->first == "priority" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setPriority(i->second); + } else if (i->first == "durable" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(i->second.asBool() ? 2 : 1); + } else { + msg->getFrames().getHeaders()->get<MessageProperties>(true)->getApplicationHeaders().setString(i->first, i->second); + } + } + return Message(msg, msg); + } + + + static Message createMessage(const std::string& exchange="", const std::string& routingKey="", + uint64_t ttl = 0, bool durable = false, const Uuid& messageId=Uuid(true), + const std::string& content="") + { + boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer()); AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); @@ -45,18 +82,18 @@ struct MessageUtils msg->getFrames().append(method); msg->getFrames().append(header); MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(contentSize); + props->setContentLength(content.size()); props->setMessageId(messageId); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); if (durable) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2); - return msg; - } - - static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data) - { - AMQFrame content((AMQContentBody(data))); - msg->getFrames().append(content); + if (ttl) + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl); + if (content.size()) { + AMQFrame data((AMQContentBody(content))); + msg->getFrames().append(data); + } + return Message(msg, msg); } }; Added: qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp?rev=1371676&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp (added) +++ qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp Fri Aug 10 12:04:27 2012 @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/broker/QueueDepth.h" + +#include "unit_test.h" + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(QueueDepthTestSuite) + +using namespace qpid::broker; + +QPID_AUTO_TEST_CASE(testCompare) +{ + QueueDepth a(0, 0); + QueueDepth b(1, 1); + QueueDepth c(2, 2); + QueueDepth d(1, 1); + + BOOST_CHECK(a < b); + BOOST_CHECK(b < c); + BOOST_CHECK(a < c); + + BOOST_CHECK(b > a); + BOOST_CHECK(c > b); + BOOST_CHECK(c > a); + + BOOST_CHECK(b == d); + BOOST_CHECK(d == b); + BOOST_CHECK(a != b); + BOOST_CHECK(b != a); + + QueueDepth e; e.setCount(1); + QueueDepth f; f.setCount(2); + BOOST_CHECK(e < f); + BOOST_CHECK(f > e); + + QueueDepth g; g.setSize(1); + QueueDepth h; h.setSize(2); + BOOST_CHECK(g < h); + BOOST_CHECK(h > g); +} + +QPID_AUTO_TEST_CASE(testIncrement) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + QueueDepth c(8, 16); + a += b; + BOOST_CHECK(a == c); + BOOST_CHECK_EQUAL(8, a.getCount()); + BOOST_CHECK_EQUAL(16, a.getSize()); +} + +QPID_AUTO_TEST_CASE(testDecrement) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + QueueDepth c(2, 4); + a -= b; + BOOST_CHECK(a == c); + BOOST_CHECK_EQUAL(2, a.getCount()); + BOOST_CHECK_EQUAL(4, a.getSize()); +} + +QPID_AUTO_TEST_CASE(testAddition) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + + QueueDepth c = a + b; + BOOST_CHECK_EQUAL(8, c.getCount()); + BOOST_CHECK_EQUAL(16, c.getSize()); +} + +QPID_AUTO_TEST_CASE(testSubtraction) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + + QueueDepth c = a - b; + BOOST_CHECK_EQUAL(2, c.getCount()); + BOOST_CHECK_EQUAL(4, c.getSize()); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests Modified: qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Fri Aug 10 12:04:27 2012 @@ -23,8 +23,8 @@ #include "unit_test.h" #include "test_tools.h" -#include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/sys/Time.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldValue.h" @@ -66,21 +66,19 @@ public: return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } - static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings) + static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments) { + QueueSettings settings; + settings.populate(arguments, settings.storeSettings); return QueueFlowLimit::createLimit(0, settings); } }; - - -QueuedMessage createMessage(uint32_t size) +Message createMessage(uint32_t size) { static uint32_t seqNum; - QueuedMessage msg; - msg.payload = MessageUtils::createMessage(); - msg.position = ++seqNum; - MessageUtils::addContent(msg.payload, std::string (size, 'x')); + Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x')); + msg.setSequence(++seqNum); return msg; } } @@ -100,7 +98,7 @@ QPID_AUTO_TEST_CASE(testFlowCount) BOOST_CHECK(!flow->isFlowControlActive()); BOOST_CHECK(flow->monitorFlowControl()); - std::deque<QueuedMessage> msgs; + std::deque<Message> msgs; for (size_t i = 0; i < 6; i++) { msgs.push_back(createMessage(10)); flow->enqueued(msgs.back()); @@ -135,7 +133,6 @@ QPID_AUTO_TEST_CASE(testFlowCount) BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF } - QPID_AUTO_TEST_CASE(testFlowSize) { FieldTable args; @@ -151,7 +148,7 @@ QPID_AUTO_TEST_CASE(testFlowSize) BOOST_CHECK(!flow->isFlowControlActive()); BOOST_CHECK(flow->monitorFlowControl()); - std::deque<QueuedMessage> msgs; + std::deque<Message> msgs; for (size_t i = 0; i < 6; i++) { msgs.push_back(createMessage(10)); flow->enqueued(msgs.back()); @@ -161,14 +158,14 @@ QPID_AUTO_TEST_CASE(testFlowSize) BOOST_CHECK_EQUAL(6u, flow->getFlowCount()); BOOST_CHECK_EQUAL(60u, flow->getFlowSize()); - QueuedMessage msg_9 = createMessage(9); + Message msg_9 = createMessage(9); flow->enqueued(msg_9); BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue - QueuedMessage tinyMsg_1 = createMessage(1); + Message tinyMsg_1 = createMessage(1); flow->enqueued(tinyMsg_1); BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue - QueuedMessage tinyMsg_2 = createMessage(1); + Message tinyMsg_2 = createMessage(1); flow->enqueued(tinyMsg_2); BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON msgs.push_back(createMessage(10)); @@ -233,12 +230,12 @@ QPID_AUTO_TEST_CASE(testFlowCombo) args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200); args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100); - std::deque<QueuedMessage> msgs_1; - std::deque<QueuedMessage> msgs_10; - std::deque<QueuedMessage> msgs_50; - std::deque<QueuedMessage> msgs_100; + std::deque<Message> msgs_1; + std::deque<Message> msgs_10; + std::deque<Message> msgs_50; + std::deque<Message> msgs_100; - QueuedMessage msg; + Message msg; std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args)); BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0 @@ -458,7 +455,6 @@ QPID_AUTO_TEST_CASE(testFlowDisable) } } - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Fri Aug 10 12:04:27 2012 @@ -22,12 +22,10 @@ #include "unit_test.h" #include "test_tools.h" -#include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/client/QueueOptions.h" #include "qpid/sys/Time.h" #include "qpid/framing/reply_exceptions.h" -#include "MessageUtils.h" #include "BrokerFixture.h" using namespace qpid::broker; @@ -39,118 +37,10 @@ namespace tests { QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite) -namespace { -QueuedMessage createMessage(uint32_t size) -{ - QueuedMessage msg; - msg.payload = MessageUtils::createMessage(); - MessageUtils::addContent(msg.payload, std::string (size, 'x')); - return msg; -} -} - -QPID_AUTO_TEST_CASE(testCount) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0)); - BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize()); - BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount()); - - QueuedMessage msg = createMessage(10); - for (size_t i = 0; i < 5; i++) { - policy->tryEnqueue(msg.payload); - } - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on enqueuing sixth message"); - } catch (const ResourceLimitExceededException&) {} - - policy->dequeued(msg); - policy->tryEnqueue(msg.payload); - - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)"); - } catch (const ResourceLimitExceededException&) {} -} - -QPID_AUTO_TEST_CASE(testSize) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50)); - QueuedMessage msg = createMessage(10); - - for (size_t i = 0; i < 5; i++) { - policy->tryEnqueue(msg.payload); - } - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); - } catch (const ResourceLimitExceededException&) {} - - policy->dequeued(msg); - policy->tryEnqueue(msg.payload); - - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy); - } catch (const ResourceLimitExceededException&) {} -} - -QPID_AUTO_TEST_CASE(testBoth) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50)); - try { - QueuedMessage msg = createMessage(51); - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy); - } catch (const ResourceLimitExceededException&) {} - - std::vector<QueuedMessage> messages; - messages.push_back(createMessage(15)); - messages.push_back(createMessage(10)); - messages.push_back(createMessage(11)); - messages.push_back(createMessage(2)); - messages.push_back(createMessage(7)); - for (size_t i = 0; i < messages.size(); i++) { - policy->tryEnqueue(messages[i].payload); - } - //size = 45 at this point, count = 5 - try { - QueuedMessage msg = createMessage(5); - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy); - } catch (const ResourceLimitExceededException&) {} - try { - QueuedMessage msg = createMessage(10); - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); - } catch (const ResourceLimitExceededException&) {} - - - policy->dequeued(messages[0]); - try { - QueuedMessage msg = createMessage(20); - policy->tryEnqueue(msg.payload); - } catch (const ResourceLimitExceededException&) { - BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy); - } -} - -QPID_AUTO_TEST_CASE(testSettings) -{ - //test reading and writing the policy from/to field table - std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303)); - FieldTable settings; - a->update(settings); - std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings)); - BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount()); - BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize()); -} - QPID_AUTO_TEST_CASE(testRingPolicyCount) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING); - policy->update(args); + QueueOptions args; + args.setSizePolicy(RING, 0, 5); SessionFixture f; std::string q("my-ring-queue"); @@ -183,9 +73,8 @@ QPID_AUTO_TEST_CASE(testRingPolicySize) // Ring queue, 500 bytes maxSize - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING); - policy->update(args); + QueueOptions args; + args.setSizePolicy(RING, 500, 0); SessionFixture f; std::string q("my-ring-queue"); @@ -255,9 +144,9 @@ QPID_AUTO_TEST_CASE(testRingPolicySize) QPID_AUTO_TEST_CASE(testStrictRingPolicy) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT); - policy->update(args); + QueueOptions args; + args.setSizePolicy(RING_STRICT, 0, 5); + args.setString("qpid.flow_stop_count", "0"); SessionFixture f; std::string q("my-ring-queue"); @@ -281,9 +170,8 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy QPID_AUTO_TEST_CASE(testPolicyWithDtx) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); - policy->update(args); + QueueOptions args; + args.setSizePolicy(REJECT, 0, 5); SessionFixture f; std::string q("my-policy-queue"); @@ -367,9 +255,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); - policy->update(args); + QueueOptions args; + args.setSizePolicy(REJECT, 0, 5); SessionFixture f; std::string q("q"); Modified: qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp Fri Aug 10 12:04:27 2012 @@ -19,6 +19,7 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueSettings.h" #include "unit_test.h" #include <string> @@ -36,33 +37,23 @@ QPID_AUTO_TEST_CASE(testDeclare) QueueRegistry reg; std::pair<Queue::shared_ptr, bool> qc; - qc = reg.declare(foo, false, 0, 0); + qc = reg.declare(foo, QueueSettings()); Queue::shared_ptr q = qc.first; BOOST_CHECK(q); BOOST_CHECK(qc.second); // New queue BOOST_CHECK_EQUAL(foo, q->getName()); - qc = reg.declare(foo, false, 0, 0); + qc = reg.declare(foo, QueueSettings()); BOOST_CHECK_EQUAL(q, qc.first); BOOST_CHECK(!qc.second); - qc = reg.declare(bar, false, 0, 0); + qc = reg.declare(bar, QueueSettings()); q = qc.first; BOOST_CHECK(q); BOOST_CHECK_EQUAL(true, qc.second); BOOST_CHECK_EQUAL(bar, q->getName()); } -QPID_AUTO_TEST_CASE(testDeclareTmp) -{ - QueueRegistry reg; - std::pair<Queue::shared_ptr, bool> qc; - - qc = reg.declare(std::string(), false, 0, 0); - BOOST_CHECK(qc.second); - BOOST_CHECK_EQUAL(std::string("tmp_1"), qc.first->getName()); -} - QPID_AUTO_TEST_CASE(testFind) { std::string foo("foo"); @@ -72,8 +63,8 @@ QPID_AUTO_TEST_CASE(testFind) BOOST_CHECK(reg.find(foo) == 0); - reg.declare(foo, false, 0, 0); - reg.declare(bar, false, 0, 0); + reg.declare(foo, QueueSettings()); + reg.declare(bar, QueueSettings()); Queue::shared_ptr q = reg.find(bar); BOOST_CHECK(q); BOOST_CHECK_EQUAL(bar, q->getName()); @@ -85,7 +76,7 @@ QPID_AUTO_TEST_CASE(testDestroy) QueueRegistry reg; std::pair<Queue::shared_ptr, bool> qc; - qc = reg.declare(foo, false, 0, 0); + qc = reg.declare(foo, QueueSettings()); reg.destroy(foo); // Queue is gone from the registry. BOOST_CHECK(reg.find(foo) == 0); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
