Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1073448&view=auto ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (added) +++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Tue Feb 22 18:23:06 2011 @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "MessageHandler.h" +#include "BrokerHandler.h" +#include "EventHandler.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace cluster { +using namespace broker; + +MessageHandler::MessageHandler(EventHandler& e) : + HandlerBase(e), + broker(e.getCore().getBroker()) +{} + +bool MessageHandler::invoke(const framing::AMQBody& body) { + return framing::invoke(*this, body).wasHandled(); +} + +void MessageHandler::routing(RoutingId routingId, const std::string& message) { + if (sender() == self()) return; // Already in getCore().getRoutingMap() + boost::intrusive_ptr<Message> msg = new Message; + // FIXME aconway 2010-10-28: decode message in bounded-size buffers. + framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); + msg->decodeHeader(buf); + msg->decodeContent(buf); + memberMap[sender()].routingMap[routingId] = msg; +} + +boost::shared_ptr<broker::Queue> MessageHandler::findQueue( + const std::string& q, const char* msg) +{ + boost::shared_ptr<Queue> queue = broker.getQueues().find(q); + if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q)); + return queue; +} + +void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed"); + boost::intrusive_ptr<Message> msg; + if (sender() == self()) + msg = eventHandler.getCore().getRoutingMap().get(routingId); + else + msg = memberMap[sender()].routingMap[routingId]; + if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q + << " failed: unknown message")); + BrokerHandler::ScopedSuppressReplication ssr; + queue->deliver(msg); +} + +void MessageHandler::routed(RoutingId routingId) { + if (sender() == self()) + eventHandler.getCore().getRoutingMap().erase(routingId); + else + memberMap[sender()].routingMap.erase(routingId); +} + +void MessageHandler::dequeue(const std::string& q, uint32_t position) { + if (sender() == self()) { + // FIXME aconway 2010-10-28: we should complete the ack that initiated + // the dequeue at this point, see BrokerHandler::dequeue + return; + } + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); + BrokerHandler::ScopedSuppressReplication ssr; + QueuedMessage qm; + // FIXME aconway 2010-10-28: when we replicate acquires, the acquired + // messages will be stored by MessageHandler::acquire. + if (queue->acquireMessageAt(position, qm)) { + assert(qm.position.getValue() == position); + assert(qm.payload); + queue->dequeue(0, qm); + } +} + +}} // namespace qpid::cluster
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1073448&view=auto ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (added) +++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h Tue Feb 22 18:23:06 2011 @@ -0,0 +1,73 @@ +#ifndef QPID_CLUSTER_MESSAGEHANDLER_H +#define QPID_CLUSTER_MESSAGEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2010-10-19: experimental cluster code. + +#include "HandlerBase.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include <boost/intrusive_ptr.hpp> +#include <map> + +namespace qpid { + +namespace broker { +class Message; +class Broker; +class Queue; +} + +namespace cluster { +class EventHandler; +class BrokerHandler; + +/** + * Handler for message disposition events. + */ +class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler, + public HandlerBase +{ + public: + MessageHandler(EventHandler&); + + bool invoke(const framing::AMQBody& body); + + void routing(uint32_t routingId, const std::string& message); + void enqueue(uint32_t routingId, const std::string& queue); + void routed(uint32_t routingId); + void dequeue(const std::string& queue, uint32_t position); + private: + struct Member { + typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap; + RoutingMap routingMap; + }; + typedef std::map<MemberId, Member> MemberMap; + + boost::shared_ptr<broker::Queue> findQueue(const std::string& q, const char* msg); + + broker::Broker& broker; + MemberMap memberMap; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/ Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt?rev=1073448&view=auto ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt (added) +++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt Tue Feb 22 18:23:06 2011 @@ -0,0 +1,2 @@ + +Experimental code to test ideas about a new cluster design. Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1073448&view=auto ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (added) +++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Tue Feb 22 18:23:06 2011 @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Core.h" +#include "WiringHandler.h" +#include "EventHandler.h" +#include "BrokerHandler.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace cluster { +using namespace broker; +using framing::FieldTable; + +WiringHandler::WiringHandler(EventHandler& e) : + HandlerBase(e), + broker(e.getCore().getBroker()), + recovery(broker.getQueues(), broker.getExchanges(), + broker.getLinks(), broker.getDtxManager()) +{} + +bool WiringHandler::invoke(const framing::AMQBody& body) { + return framing::invoke(*this, body).wasHandled(); +} + +void WiringHandler::createQueue(const std::string& data) { + if (sender() == self()) return; + BrokerHandler::ScopedSuppressReplication ssr; + framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); + // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() + RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf); + QPID_LOG(debug, "cluster: create queue " << queue->getName()); +} + +void WiringHandler::destroyQueue(const std::string& name) { + if (sender() == self()) return; + QPID_LOG(debug, "cluster: destroy queue " << name); + BrokerHandler::ScopedSuppressReplication ssr; + broker.deleteQueue(name, std::string(), std::string()); +} + +void WiringHandler::createExchange(const std::string& data) { + if (sender() == self()) return; + BrokerHandler::ScopedSuppressReplication ssr; + framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); + // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() + RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf); + QPID_LOG(debug, "cluster: create exchange " << exchange->getName()); +} + +void WiringHandler::destroyExchange(const std::string& name) { + if (sender() == self()) return; + QPID_LOG(debug, "cluster: destroy exchange " << name); + BrokerHandler::ScopedSuppressReplication ssr; + broker.getExchanges().destroy(name); +} + +void WiringHandler::bind( + const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const FieldTable& arguments) +{ + if (sender() == self()) return; + QPID_LOG(debug, "cluster: bind queue=" << queueName + << " exchange=" << exchangeName + << " key=" << routingKey + << " arguments=" << arguments); + BrokerHandler::ScopedSuppressReplication ssr; + broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string()); +} + +void WiringHandler::unbind( + const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const FieldTable& arguments) +{ + if (sender() == self()) return; + QPID_LOG(debug, "cluster: unbind queue=" << queueName + << " exchange=" << exchangeName + << " key=" << routingKey + << " arguments=" << arguments); + BrokerHandler::ScopedSuppressReplication ssr; + broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string()); +} + +}} // namespace qpid::cluster Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1073448&view=auto ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (added) +++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h Tue Feb 22 18:23:06 2011 @@ -0,0 +1,75 @@ +#ifndef QPID_CLUSTER_WIRINGHANDLER_H +#define QPID_CLUSTER_WIRINGHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2010-10-19: experimental cluster code. + +#include "HandlerBase.h" +#include "qpid/broker/RecoveryManagerImpl.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include <boost/intrusive_ptr.hpp> +#include <map> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { +class Broker; +} + +namespace cluster { +class EventHandler; + + +/** + * Handler for wiring disposition events. + */ +class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, + public HandlerBase +{ + public: + WiringHandler(EventHandler&); + + bool invoke(const framing::AMQBody& body); + + void createQueue(const std::string& data); + void destroyQueue(const std::string& name); + void createExchange(const std::string& data); + void destroyExchange(const std::string& name); + void bind(const std::string& queue, const std::string& exchange, + const std::string& routingKey, const framing::FieldTable& arguments); + void unbind(const std::string& queue, const std::string& exchange, + const std::string& routingKey, const framing::FieldTable& arguments); + + + private: + broker::Broker& broker; + broker::RecoveryManagerImpl recovery; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_WIRINGHANDLER_H*/ Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h?rev=1073448&r1=1073447&r2=1073448&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h (original) +++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h Tue Feb 22 18:23:06 2011 @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -37,7 +37,7 @@ extern "C" { # include <corosync/cpg.h> #else # error "No cpg.h header file available" -#endif +#endif } namespace qpid { @@ -78,6 +78,9 @@ std::ostream& operator<<(std::ostream&, std::ostream& operator<<(std::ostream&, EventType); +/** Number to identify a message being routed. */ +typedef uint32_t RoutingId; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_TYPES_H*/ Added: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1073448&view=auto ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp (added) +++ qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp Tue Feb 22 18:23:06 2011 @@ -0,0 +1,419 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +///@file +// Tests using a dummy broker::Cluster implementation to verify the expected +// Cluster functions are called for various actions on the broker. +// + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/broker/Cluster.h" +#include "qpid/broker/Queue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Duration.h" +#include "BrokerFixture.h" +#include <boost/assign.hpp> +#include <boost/format.hpp> + +using namespace std; +using namespace boost; +using namespace boost::assign; +using namespace qpid::messaging; +using boost::format; +using boost::intrusive_ptr; + +namespace qpid { +namespace tests { + +class DummyCluster : public broker::Cluster +{ + private: + /** Flag used to ignore events other than enqueues while routing, + * e.g. acquires and accepts generated in a ring queue to replace an element.. + * In real impl would be a thread-local variable. + */ + bool isRouting; + + void recordQm(const string& op, const broker::QueuedMessage& qm) { + history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() + % qm.position % qm.payload->getFrames().getContent()).str(); + } + void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { + history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); + } + void recordStr(const string& op, const string& name) { + history += (format("%s(%s)") % op % name).str(); + } + public: + // Messages + + virtual void routing(const boost::intrusive_ptr<broker::Message>& m) { + isRouting = true; + history += (format("routing(%s)") % m->getFrames().getContent()).str(); + } + + virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) { + recordMsg("enqueue", q, msg); + return true; + } + + virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { + history += (format("routed(%s)") % m->getFrames().getContent()).str(); + isRouting = false; + } + virtual void acquire(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("acquire", qm); + } + virtual void release(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("release", qm); + } + virtual void dequeue(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("dequeue", qm); + } + + // Consumers + + virtual void consume(const broker::Queue& q, size_t n) { + history += (format("consume(%s, %d)") % q.getName() % n).str(); + } + virtual void cancel(const broker::Queue& q, size_t n) { + history += (format("cancel(%s, %d)") % q.getName() % n).str(); + } + + // Wiring + + virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); } + virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); } + virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); } + virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); } + virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { + history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str(); + } + virtual void unbind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { + history += (format("unbind(%s, %s, %s)")% q.getName()%ex.getName()%key).str(); + } + vector<string> history; +}; + +QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite) + +// Broker fixture with DummyCluster set up and some new API client bits. +struct DummyClusterFixture: public BrokerFixture { + Connection c; + Session s; + DummyCluster*dc; + DummyClusterFixture() { + broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster)); + dc = &static_cast<DummyCluster&>(broker->getCluster()); + c = Connection("localhost:"+lexical_cast<string>(getPort())); + c.open(); + s = c.createSession(); + } + ~DummyClusterFixture() { + c.close(); + } +}; + +QPID_AUTO_TEST_CASE(testSimplePubSub) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // Queue creation + Sender sender = f.s.createSender("q;{create:always,delete:always}"); + size_t i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + // Consumer + Receiver receiver = f.s.createReceiver("q"); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + // Don't check size here as it is uncertain whether acquire has happened yet. + + // Acquire message + Message m = receiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Acknowledge message + f.s.acknowledge(true); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Close a consumer + receiver.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Destroy the queue + f.c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testReleaseReject) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}"); + sender.send(Message("a")); + Receiver receiver = f.s.createReceiver("q"); + Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}"); + Message m = receiver.fetch(Duration::SECOND); + h.clear(); + + // Explicit release + f.s.release(m); + f.s.sync(); + size_t i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Implicit release on closing connection. + Connection c("localhost:"+lexical_cast<string>(f.getPort())); + c.open(); + Session s = c.createSession(); + Receiver r = s.createReceiver("q"); + m = r.fetch(Duration::SECOND); + h.clear(); + i = 0; + c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Reject message, goes to alternate exchange. + m = receiver.fetch(Duration::SECOND); + h.clear(); + i = 0; + f.s.reject(m); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + m = altReceiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(m.getContent(), "a"); + + // Timed out message + h.clear(); + i = 0; + m = Message("t"); + m.setTtl(Duration(1)); // Timeout 1ms + sender.send(m); + usleep(2000); // Sleep 2ms + bool received = receiver.fetch(m, Duration::IMMEDIATE); + BOOST_CHECK(!received); // Timed out + BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Message replaced on LVQ + sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}"); + m = Message("a"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + m = Message("b"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + receiver = f.s.createReceiver("lvq"); + BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b"); + f.s.acknowledge(true); + BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testFanout) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}"); + Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}"); + Sender sender = f.s.createSender("amq.fanout"); + r1.setCapacity(0); // Don't receive immediately. + r2.setCapacity(0); + h.clear(); + size_t i = 0; + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); + BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); + BOOST_CHECK(h.at(i-1) != h.at(i-2)); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Receive messages + Message m1 = r1.fetch(Duration::SECOND); + f.s.acknowledge(m1, true); + Message m2 = r2.fetch(Duration::SECOND); + f.s.acknowledge(m2, true); + + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testRingQueue) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working, + // so we can't do this: + // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}"); + // Must use old API to declare ring queue: + qpid::client::Connection c; + f.open(c); + qpid::client::Session s = c.newSession(); + qpid::framing::FieldTable args; + args.setInt("qpid.max_size", 3); + args.setString("qpid.policy_type","ring"); + s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args); + c.close(); + Sender sender = f.s.createSender("ring"); + + size_t i = 0; + // Send message + sender.send(Message("a")); + sender.send(Message("b")); + sender.send(Message("c")); + sender.send(Message("d")); + f.s.sync(); + + BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); + + Receiver receiver = f.s.createReceiver("ring"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d"); + f.s.acknowledge(true); + + BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)"); + + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testTransactions) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + Session ts = f.c.createTransactionalSession(); + Sender sender = ts.createSender("q;{create:always,delete:always}"); + size_t i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + sender.send(Message("a")); + sender.send(Message("b")); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit + ts.commit(); + // FIXME aconway 2010-10-18: As things stand the cluster is not + // compatible with transactions + // - enqueues occur after routing is complete + // - no call to Cluster::enqueue, should be in Queue::process? + // - no transaction context associated with messages in the Cluster interface. + // - no call to Cluster::accept in Queue::dequeueCommitted + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + + Receiver receiver = ts.createReceiver("q"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + ts.acknowledge(); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + ts.commit(); + ts.sync(); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am?rev=1073448&r1=1073447&r2=1073448&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am (original) +++ qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am Tue Feb 22 18:23:06 2011 @@ -25,7 +25,7 @@ QMF_GEN=$(top_srcdir)/managementgen/qmf- abs_builddir=@abs_builddir@ abs_srcdir=@abs_srcdir@ -extra_libs = +extra_libs = lib_client = $(abs_builddir)/../libqpidclient.la lib_messaging = $(abs_builddir)/../libqpidmessaging.la lib_common = $(abs_builddir)/../libqpidcommon.la @@ -36,7 +36,7 @@ lib_qmf2 = $(abs_builddir)/../libqmf2.la # # Initialize variables that are incremented with += -# +# check_PROGRAMS= check_LTLIBRARIES= TESTS= @@ -61,9 +61,9 @@ tmodule_LTLIBRARIES= # Unit test program # # Unit tests are built as a single program to reduce valgrind overhead -# when running the tests. If you want to build a subset of the tests do +# when running the tests. If you want to build a subset of the tests do # rm -f unit_test; make unit_test unit_test_OBJECTS="unit_test.o SelectedTest.o" -# +# TESTS+=unit_test check_PROGRAMS+=unit_test @@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_te Variant.cpp \ Address.cpp \ ClientMessage.cpp \ - Qmf2.cpp + Qmf2.cpp \ + BrokerClusterCalls.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -188,32 +189,32 @@ qpid_send_LDADD = $(lib_messaging) qpidtest_PROGRAMS+=qpid-perftest qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES) -qpid_perftest_LDADD=$(lib_client) +qpid_perftest_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-txtest qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES) qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h -qpid_txtest_LDADD=$(lib_client) +qpid_txtest_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-latency-test qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h -qpid_latency_test_LDADD=$(lib_client) +qpid_latency_test_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-client-test qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h -qpid_client_test_LDADD=$(lib_client) +qpid_client_test_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-topic-listener qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h -qpid_topic_listener_LDADD=$(lib_client) +qpid_topic_listener_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-topic-publisher qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h -qpid_topic_publisher_LDADD=$(lib_client) +qpid_topic_publisher_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid-ping qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) @@ -232,17 +233,17 @@ echotest_LDADD=$(lib_client) check_PROGRAMS+=publish publish_INCLUDES=$(PUBLIC_INCLUDES) publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h -publish_LDADD=$(lib_client) +publish_LDADD=$(lib_client) check_PROGRAMS+=consume consume_INCLUDES=$(PUBLIC_INCLUDES) consume_SOURCES=consume.cpp TestOptions.h ConnectionOptions.h -consume_LDADD=$(lib_client) +consume_LDADD=$(lib_client) check_PROGRAMS+=header_test header_test_INCLUDES=$(PUBLIC_INCLUDES) header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h -header_test_LDADD=$(lib_client) +header_test_LDADD=$(lib_client) check_PROGRAMS+=failover_soak failover_soak_INCLUDES=$(PUBLIC_INCLUDES) @@ -251,28 +252,28 @@ failover_soak_LDADD=$(lib_client) $(lib_ check_PROGRAMS+=declare_queues declare_queues_INCLUDES=$(PUBLIC_INCLUDES) -declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(lib_client) +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(lib_client) check_PROGRAMS+=replaying_sender replaying_sender_INCLUDES=$(PUBLIC_INCLUDES) -replaying_sender_SOURCES=replaying_sender.cpp -replaying_sender_LDADD=$(lib_client) +replaying_sender_SOURCES=replaying_sender.cpp +replaying_sender_LDADD=$(lib_client) check_PROGRAMS+=resuming_receiver resuming_receiver_INCLUDES=$(PUBLIC_INCLUDES) -resuming_receiver_SOURCES=resuming_receiver.cpp -resuming_receiver_LDADD=$(lib_client) +resuming_receiver_SOURCES=resuming_receiver.cpp +resuming_receiver_LDADD=$(lib_client) check_PROGRAMS+=txshift txshift_INCLUDES=$(PUBLIC_INCLUDES) txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h -txshift_LDADD=$(lib_client) +txshift_LDADD=$(lib_client) check_PROGRAMS+=txjob txjob_INCLUDES=$(PUBLIC_INCLUDES) txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h -txjob_LDADD=$(lib_client) +txjob_LDADD=$(lib_client) check_PROGRAMS+=PollerTest PollerTest_SOURCES=PollerTest.cpp @@ -307,7 +308,7 @@ TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ LIBTOOL="$(LIBTOOL)" \ QPID_DATA_DIR= \ - $(srcdir)/run_test + $(srcdir)/run_test system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \ @@ -352,7 +353,8 @@ EXTRA_DIST += \ start_broker.ps1 \ stop_broker.ps1 \ topictest.ps1 \ - run_queue_flow_limit_tests + run_queue_flow_limit_tests \ + run_cluster_authentication_test check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) @@ -375,6 +377,7 @@ EXTRA_DIST+= \ run_failover_soak \ reliable_replication_test \ federated_cluster_test_with_node_failure \ + run_cluster_authentication_soak \ sasl_test_setup.sh check-long: Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py?rev=1073448&r1=1073447&r2=1073448&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py (original) +++ qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py Tue Feb 22 18:23:06 2011 @@ -437,17 +437,25 @@ class Cluster: _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, + cluster2=False): + if cluster2: + cluster_name = "--cluster2-name" + cluster_lib = BrokerTest.cluster2_lib + else: + cluster_name = "--cluster-name" + cluster_lib = BrokerTest.cluster_lib self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count Cluster._cluster_count += 1 # Use unique cluster name self.args = copy(args) - self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] + self.args += [ cluster_name, + "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] - assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" - self.args += [ "--load-module", BrokerTest.cluster_lib ] + assert cluster_lib, "Cannot locate cluster plug-in" + self.args += [ "--load-module", cluster_lib ] self.start_n(count, expect=expect, wait=wait) def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): @@ -473,6 +481,7 @@ class BrokerTest(TestCase): # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) cluster_lib = os.getenv("CLUSTER_LIB") + cluster2_lib = os.getenv("CLUSTER2_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") @@ -523,9 +532,9 @@ class BrokerTest(TestCase): raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, cluster2=False): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait=wait) + cluster = Cluster(self, count, args, expect=expect, wait=wait, cluster2=cluster2) return cluster def assert_browse(self, session, queue, expect_contents, timeout=0): Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk?rev=1073448&r1=1073447&r2=1073448&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk (original) +++ qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk Tue Feb 22 18:23:06 2011 @@ -92,7 +92,7 @@ cluster_test_SOURCES = \ PartialFailure.cpp \ ClusterFailover.cpp -cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework +cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST) Added: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py?rev=1073448&view=auto ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py (added) +++ qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py Tue Feb 22 18:23:06 2011 @@ -0,0 +1,116 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess +from qpid import datatypes, messaging +from brokertest import * +from qpid.harness import Skipped +from qpid.messaging import Message +from qpid.messaging.exceptions import * +from threading import Thread, Lock +from logging import getLogger +from itertools import chain + +log = getLogger("qpid.cluster_tests") + +class Cluster2Tests(BrokerTest): + """Tests for new cluster code.""" + + def verify_content(self, content, receiver): + for c in content: self.assertEqual(c, receiver.fetch(1).content) + self.assertRaises(Empty, receiver.fetch, 0) + + def test_message_enqueue(self): + """Test basic replication of enqueued messages. + Verify that fanout messages are replicated correctly. + """ + + cluster = self.cluster(2, cluster2=True) + + sn0 = cluster[0].connect().session() + r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + s0 = sn0.sender("amq.fanout"); + + sn1 = cluster[1].connect().session() + r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + + + # Send messages on member 0 + content = ["a","b","c"] + for m in content: s0.send(Message(m)) + + # Browse on both members. + self.verify_content(content, r0p) + self.verify_content(content, r0q) + self.verify_content(content, r1p) + self.verify_content(content, r1q) + + sn1.connection.close() + sn0.connection.close() + + def test_message_dequeue(self): + """Test replication of dequeues""" + cluster = self.cluster(2, cluster2=True) + sn0 = cluster[0].connect().session() + s0 = sn0.sender("q;{create:always,delete:always}") + r0 = sn0.receiver("q") + sn1 = cluster[1].connect().session() + r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring. + + content = ["a","b","c"] + for m in content: s0.send(Message(m)) + # Verify enqueued on cluster[1] + self.verify_content(content, sn1.receiver("q;{mode:browse}")) + # Dequeue on cluster[0] + self.assertEqual(r0.fetch(1).content, "a") + sn0.acknowledge(sync=True) + + # Verify dequeued on cluster[0] and cluster[1] + self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}")) + self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}")) + + def test_wiring(self): + """Test replication of wiring""" + cluster = self.cluster(2, cluster2=True) + sn0 = cluster[0].connect().session() + sn1 = cluster[1].connect().session() + + # Test creation of queue, exchange, binding + r0ex = sn0.receiver("ex; {create:always, delete:always, node:{type:topic, x-declare:{name:ex, type:'direct'}}}") + r0q = sn0.receiver("q; {create:always, delete:always, link:{x-bindings:[{exchange:ex,queue:q,key:k}]}}") + + # Verify objects were created on member 1 + r1 = sn1.receiver("q") # Queue + s1ex = sn1.sender("ex/k; {node:{type:topic}}"); # Exchange + s1ex.send(Message("x")) # Binding with key k + self.assertEqual(r1.fetch(1).content, "x") + + # Test destroy. + r0q.close() # Delete queue q + self.assertRaises(NotFound, sn1.receiver, "q") + r0ex.close() # Delete exchange ex + # FIXME aconway 2010-11-05: this does not raise NotFound, sn1 is caching "ex" + # self.assertRaises(NotFound, sn1.sender, "ex") + # Have to create a new session. + self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex") + + # FIXME aconway 2010-10-29: test unbind, may need to use old API. Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py ------------------------------------------------------------------------------ svn:executable = * Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests?rev=1073448&r1=1073447&r2=1073448&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests (original) +++ qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests Tue Feb 22 18:23:06 2011 @@ -33,5 +33,5 @@ mkdir -p $OUTDIR CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} CLUSTER_TESTS=${CLUSTER_TESTS:-$*} -with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 +with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 rm -rf $OUTDIR Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in?rev=1073448&r1=1073447&r2=1073448&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in (original) +++ qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in Tue Feb 22 18:23:06 2011 @@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/tes exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; } exportmodule ACL_LIB acl.so exportmodule CLUSTER_LIB cluster.so +exportmodule CLUSTER2_LIB cluster2.so exportmodule REPLICATING_LISTENER_LIB replicating_listener.so exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so exportmodule SSLCONNECTOR_LIB sslconnector.so Modified: qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml?rev=1073448&r1=1073447&r2=1073448&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml (original) +++ qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml Tue Feb 22 18:23:06 2011 @@ -284,4 +284,62 @@ </control> </class> + + <!-- TODO aconway 2010-10-20: Experimental classes for new cluster. --> + + <!-- Message delivery and disposition --> + <class name="cluster-message" code="0x82"> + <!-- FIXME aconway 2010-10-19: create message in fragments --> + <control name="routing" code="0x1"> + <field name="routing-id" type="uint32"/> + <field name="message" type="str32"/> + </control> + + <control name="enqueue" code="0x2"> + <field name="routing-id" type="uint32"/> + <field name="queue" type="queue.name"/> + </control> + + <control name="routed" code="0x3"> + <field name="routing-id" type="uint32"/> + </control> + + <control name="dequeue" code="0x4"> + <field name="queue" type="queue.name"/> + <field name="position" type="uint32"/> + </control> + </class> + + <class name="cluster-wiring" code="0x83"> + <control name="create-queue" code="0x1"> + <field name="data" type="str32"/> + </control> + + <control name="destroy-queue" code="0x2"> + <field name="name" type="queue.name"/> + </control> + + <control name="create-exchange" code="0x3"> + <field name="data" type="str32"/> + </control> + + <control name="destroy-exchange" code="0x4"> + <field name="name" type="exchange.name"/> + </control> + + <control name="bind" code="0x5"> + <field name="queue" type="queue.name"/> + <field name="exchange" type="exchange.name"/> + <field name="binding-key" type="str8"/> + <field name="arguments" type="map"/> + </control> + + <control name="unbind" code="0x6"> + <field name="queue" type="queue.name"/> + <field name="exchange" type="exchange.name"/> + <field name="binding-key" type="str8"/> + <field name="arguments" type="map"/> + </control> + + </class> </amqp> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org