Author: aconway Date: Mon Feb 9 22:25:26 2009 New Revision: 742774 URL: http://svn.apache.org/viewvc?rev=742774&view=rev Log: Cluster support for message time-to-live.
Added: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp (with props) qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h (with props) qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (with props) qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (with props) Modified: qpid/trunk/qpid/cpp/examples/tradedemo/ (props changed) qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Message.h qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp qpid/trunk/qpid/cpp/xml/cluster.xml Propchange: qpid/trunk/qpid/cpp/examples/tradedemo/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Mon Feb 9 22:25:26 2009 @@ -0,0 +1 @@ +Makefile.in Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Feb 9 22:25:26 2009 @@ -358,6 +358,8 @@ qpid/broker/Broker.cpp \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/Exchange.cpp \ + qpid/broker/ExpiryPolicy.h \ + qpid/broker/ExpiryPolicy.cpp \ qpid/broker/Queue.cpp \ qpid/broker/QueueCleaner.cpp \ qpid/broker/QueueListeners.cpp \ Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Feb 9 22:25:26 2009 @@ -63,6 +63,8 @@ qpid/cluster/Event.h \ qpid/cluster/EventFrame.h \ qpid/cluster/EventFrame.cpp \ + qpid/cluster/ExpiryPolicy.h \ + qpid/cluster/ExpiryPolicy.cpp \ qpid/cluster/FailoverExchange.cpp \ qpid/cluster/FailoverExchange.h \ qpid/cluster/Multicaster.cpp \ Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Feb 9 22:25:26 2009 @@ -30,6 +30,7 @@ #include "SecureConnectionFactory.h" #include "TopicExchange.h" #include "Link.h" +#include "ExpiryPolicy.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" @@ -150,6 +151,7 @@ queueCleaner(queues, timer), queueEvents(poller), recovery(true), + expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Feb 9 22:25:26 2009 @@ -36,6 +36,7 @@ #include "Vhost.h" #include "System.h" #include "Timer.h" +#include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementBroker.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -65,6 +66,8 @@ namespace broker { +class ExpiryPolicy; + static const uint16_t DEFAULT_PORT=5672; struct NoSuchTransportException : qpid::Exception @@ -111,6 +114,8 @@ private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; + void declareStandardExchange(const std::string& name, const std::string& type); + boost::shared_ptr<sys::Poller> poller; Options config; management::ManagementAgent::Singleton managementAgentSingleton; @@ -132,14 +137,11 @@ System::shared_ptr systemObject; QueueCleaner queueCleaner; QueueEvents queueEvents; - - void declareStandardExchange(const std::string& name, const std::string& type); - std::vector<Url> knownBrokers; std::vector<Url> getKnownBrokersImpl(); std::string federationTag; - bool recovery; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; public: @@ -180,6 +182,9 @@ Options& getOptions() { return config; } QueueEvents& getQueueEvents() { return queueEvents; } + void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } + boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } + SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } Added: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp?rev=742774&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp Mon Feb 9 22:25:26 2009 @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ExpiryPolicy.h" +#include "Message.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace broker { + +ExpiryPolicy::~ExpiryPolicy() {} + +void ExpiryPolicy::willExpire(Message&) {} + +bool ExpiryPolicy::hasExpired(Message& m) { + return m.getExpiration() < sys::AbsTime::now(); +} + +}} // namespace qpid::broker Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h?rev=742774&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h Mon Feb 9 22:25:26 2009 @@ -0,0 +1,44 @@ +#ifndef QPID_BROKER_EXPIRYPOLICY_H +#define QPID_BROKER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace broker { + +class Message; + +/** + * Default expiry policy. + */ +class ExpiryPolicy : public RefCounted +{ + public: + virtual ~ExpiryPolicy(); + virtual void willExpire(Message&); + virtual bool hasExpired(Message&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Feb 9 22:25:26 2009 @@ -21,6 +21,7 @@ #include "Message.h" #include "ExchangeRegistry.h" +#include "ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -316,24 +317,29 @@ } } -void Message::setTimestamp() +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { DeliveryProperties* props = getProperties<DeliveryProperties>(); - //Spec states that timestamp should be set, evaluate the - //performance impact before re-enabling this: - //time_t now = ::time(0); - //props->setTimestamp(now); if (props->getTtl()) { - //set expiration (nb: ttl is in millisecs, time_t is in secs) + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); + // Use higher resolution time for the internal expiry calculation. expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + setExpiryPolicy(e); } } -bool Message::hasExpired() const +void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() { - return expiration < FAR_FUTURE && expiration < AbsTime::now(); + return expiryPolicy && expiryPolicy->hasExpired(*this); } boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Feb 9 22:25:26 2009 @@ -45,6 +45,7 @@ class ExchangeRegistry; class MessageStore; class Queue; +class ExpiryPolicy; class Message : public PersistableMessage { public: @@ -73,8 +74,11 @@ const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); bool requiresAccept(); - void setTimestamp(); - bool hasExpired() const; + + void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); + bool hasExpired(); + sys::AbsTime getExpiration() const { return expiration; } framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } @@ -171,6 +175,7 @@ ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; static TransferAdapter TRANSFER; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Feb 9 22:25:26 2009 @@ -358,14 +358,13 @@ std::string exchangeName = msg->getExchangeName(); //TODO: the following should be hidden behind message (using MessageAdapter or similar) - // Do not replace the delivery-properties.exchange if it is is already set. - // This is used internally (by the cluster) to force the exchange name on a message. - // The client library ensures this is always empty for messages from normal clients. if (msg->isA<MessageTransferBody>()) { - if (!msg->hasProperties<DeliveryProperties>() || - msg->getProperties<DeliveryProperties>()->getExchange().empty()) + // Do not replace the delivery-properties.exchange if it is is already set. + // This is used internally (by the cluster) to force the exchange name on a message. + // The client library ensures this is always empty for messages from normal clients. + if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty()) msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); - msg->setTimestamp(); + msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Feb 9 22:25:26 2009 @@ -76,6 +76,7 @@ void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -103,6 +104,8 @@ poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + frameId(0), initialized(false), state(INIT), lastSize(0), @@ -134,6 +137,7 @@ myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); @@ -238,7 +242,8 @@ // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); QPID_LOG(trace, *this << " DLVR: " << e); QPID_LATENCY_RECORD("delivered frame queue", e.frame); if (e.isCluster()) { // Cluster control frame @@ -333,22 +338,23 @@ state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); - ClusterMap::Set members = map.getAlive(); - members.erase(myId); - myElders = members; + elders = map.getAlive(); + elders.erase(myId); broker.getLinks().setPassive(true); } } else if (state >= READY && memberChange) { memberUpdate(l); - myElders = ClusterMap::intersection(myElders, map.getAlive()); - if (myElders.empty()) { + elders = ClusterMap::intersection(elders, map.getAlive()); + if (elders.empty()) { //assume we are oldest, reactive links if necessary broker.getLinks().setPassive(false); } } } +bool Cluster::isLeader() const { return elders.empty(); } + void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -420,15 +426,16 @@ deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, connections.values(), - boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1))); + new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { Lock l(lock); updatedMap = m; + frameId = fid; checkUpdateIn(l); } @@ -573,4 +580,8 @@ QPID_LOG(debug, *this << " cluster-id = " << clusterId); } +void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { + expiryPolicy->deliverExpire(id); +} + }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Feb 9 22:25:26 2009 @@ -31,6 +31,7 @@ #include "Quorum.h" #include "Decoder.h" #include "PollableQueue.h" +#include "ExpiryPolicy.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" @@ -89,7 +90,7 @@ void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&); + void updateInDone(const ClusterMap&, uint64_t frameId); MemberId getId() const; broker::Broker& getBroker() const; @@ -100,6 +101,8 @@ size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } + + bool isLeader() const; // Called in deliver thread. private: typedef sys::Monitor::ScopedLock Lock; @@ -129,6 +132,7 @@ void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); + void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -185,7 +189,6 @@ const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; - ClusterMap::Set myElders; qpid::management::ManagementAgent* mAgent; // Thread safe members @@ -197,8 +200,11 @@ boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; - // Called only from event delivery thread + // Used only in delivery thread Decoder decoder; + ClusterMap::Set elders; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + uint64_t frameId; // Used only during initialization bool initialized; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Feb 9 22:25:26 2009 @@ -54,7 +54,7 @@ bool quorum; size_t readMax, writeEstimate; - ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {} + ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Feb 9 22:25:26 2009 @@ -127,10 +127,6 @@ case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; } } - else if (body.type() == HEADER_BODY) { - const DeliveryProperties* dp = static_cast<const AMQHeaderBody&>(body).get<DeliveryProperties>(); - if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; - } if (!message.empty()) connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message); return !message.empty(); @@ -259,9 +255,9 @@ self = shadow; } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members), frameId); self.second = 0; // Mark this as completed update connection. } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Feb 9 22:25:26 2009 @@ -119,7 +119,7 @@ void shadowReady(uint64_t memberId, uint64_t connectionId); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=742774&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Mon Feb 9 22:25:26 2009 @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ExpiryPolicy.h" +#include "Multicaster.h" +#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/sys/Time.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Timer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t) + : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {} + +namespace { +uint64_t clusterId(const broker::Message& m) { + assert(m.getFrames().begin() != m.getFrames().end()); + return m.getFrames().begin()->getClusterId(); +} + +struct ExpiryTask : public broker::TimerTask { + ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) + : TimerTask(when), expiryPolicy(policy), messageId(id) {} + void fire() { expiryPolicy->sendExpire(messageId); } + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + const uint64_t messageId; +}; +} + +void ExpiryPolicy::willExpire(broker::Message& m) { + timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration())); +} + +bool ExpiryPolicy::hasExpired(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); + IdSet::iterator i = expired.find(clusterId(m)); + if (i != expired.end()) { + expired.erase(i); + const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true; + return true; + } + return false; +} + +void ExpiryPolicy::sendExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + if (isLeader()) + mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); +} + +void ExpiryPolicy::deliverExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + expired.insert(id); +} + +bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } +void ExpiryPolicy::Expired::willExpire(broker::Message&) { } + +}} // namespace qpid::cluster Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=742774&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Mon Feb 9 22:25:26 2009 @@ -0,0 +1,76 @@ +#ifndef QPID_CLUSTER_EXPIRYPOLICY_H +#define QPID_CLUSTER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/sys/Mutex.h" +#include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> +#include <set> + +namespace qpid { + +namespace broker { class Timer; } + +namespace cluster { +class Multicaster; + +/** + * Cluster expiry policy + */ +class ExpiryPolicy : public broker::ExpiryPolicy +{ + public: + ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&); + + void willExpire(broker::Message&); + + bool hasExpired(broker::Message&); + + // Send expiration notice to cluster. + void sendExpire(uint64_t); + + // Cluster delivers expiry notice. + void deliverExpire(uint64_t); + + private: + sys::Mutex lock; + typedef std::set<uint64_t> IdSet; + + struct Expired : public broker::ExpiryPolicy { + bool hasExpired(broker::Message&); + void willExpire(broker::Message&); + }; + + IdSet expired; + boost::intrusive_ptr<Expired> expiredPolicy; + boost::function<bool()> isLeader; + Multicaster& mcast; + MemberId memberId; + broker::Timer& timer; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Feb 9 22:25:26 2009 @@ -86,10 +86,12 @@ // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, - const boost::function<void()>& ok, - const boost::function<void(const std::exception&)>& fail) - : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons), + broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + const Cluster::Connections& cons, + const boost::function<void()>& ok, + const boost::function<void(const std::exception&)>& fail) + : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), + frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { @@ -120,6 +122,7 @@ ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Feb 9 22:25:26 2009 @@ -63,9 +63,10 @@ static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& , - const boost::function<void()>& done, - const boost::function<void(const std::exception&)>& fail); + broker::Broker& donor, const ClusterMap& map, uint64_t sequence, + const std::vector<boost::intrusive_ptr<Connection> >& , + const boost::function<void()>& done, + const boost::function<void(const std::exception&)>& fail); ~UpdateClient(); void update(); @@ -89,6 +90,7 @@ Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; + uint64_t frameId; std::vector<boost::intrusive_ptr<Connection> > connections; client::Connection connection, shadowConnection; client::AsyncSession session, shadowSession; Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Mon Feb 9 22:25:26 2009 @@ -92,6 +92,9 @@ /** Must point to at least DECODE_SIZE_MIN bytes of data */ static uint16_t decodeSize(char* data); + uint64_t getClusterId() const { return clusterId; } + void setClusterId(uint64_t id) { clusterId = id; } + private: void init(); @@ -103,6 +106,7 @@ bool bos : 1; bool eos : 1; mutable uint32_t encodedSizeCache; + uint64_t clusterId; // Used to identify frames in a clustered broekr. }; std::ostream& operator<<(std::ostream&, const AMQFrame&); Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Mon Feb 9 22:25:26 2009 @@ -49,6 +49,7 @@ bool isComplete() const; uint64_t getContentSize() const; + void getContent(std::string&) const; std::string getContent() const; @@ -73,6 +74,9 @@ return header ? header->get<T>() : 0; } + Frames::const_iterator begin() const { return parts.begin(); } + Frames::const_iterator end() const { return parts.end(); } + const SequenceNumber& getId() const { return id; } template <class P> void remove(P predicate) { Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon Feb 9 22:25:26 2009 @@ -26,6 +26,7 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include <iostream> @@ -491,7 +492,7 @@ } else { if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl); } - m->setTimestamp(); + m->setTimestamp(new broker::ExpiryPolicy); queue.deliver(m); } } Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Feb 9 22:25:26 2009 @@ -37,6 +37,7 @@ #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> +#include <boost/assign.hpp> #include <string> #include <iostream> @@ -51,22 +52,23 @@ ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); } } - -QPID_AUTO_TEST_SUITE(cluster) +QPID_AUTO_TEST_SUITE(cluster_test) using namespace std; using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; -using qpid::sys::TIME_SEC; -using qpid::broker::Broker; +using namespace boost::assign; +using broker::Broker; using boost::shared_ptr; -using qpid::cluster::Cluster; + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=sys::TIME_SEC/4; ostream& operator<<(ostream& o, const cpg_name* n) { - return o << qpid::cluster::Cpg::str(*n); + return o << cluster::Cpg::str(*n); } ostream& operator<<(ostream& o, const cpg_address& a) { @@ -94,7 +96,7 @@ BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); // Retry up to 10 secs in .1 second intervals. for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - ::usleep(1000*100); // 0.1 secs + sys::usleep(1000*100); // 0.1 secs urls = source.getKnownBrokers(); } } @@ -127,6 +129,45 @@ return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); } +Message ttlMessage(const std::string& data, const std::string& key, uint64_t ttl) { + Message m(data, key); + m.getDeliveryProperties().setTtl(ttl); + return m; +} + +vector<std::string> browse(Client& c, const std::string& q, int n) { + SubscriptionSettings browseSettings( + FlowControl::unlimited(), + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + c.subs.subscribe(lq, q, browseSettings); + vector<std::string> result; + for (int i = 0; i < n; ++i) { + result.push_back(lq.get(TIMEOUT).getData()); + } + c.subs.getSubscription(q).cancel(); + return result; +} + +QPID_AUTO_TEST_CASE(testMessageTimeToLive) { + // Note: this doesn't actually test for cluster race conditions around TTL, + // it just verifies that basic TTL functionality works. + // + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); + c0.session.messageTransfer(arg::content=Message("b", "q")); + BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<std::string>("a")("b")); + sys::usleep(300*1000); + BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<std::string>("b")); + BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<std::string>("b")); +} + QPID_AUTO_TEST_CASE(testSequenceOptions) { // Make sure the exchange qpid.msg_sequence property is properly replicated. ClusterFixture cluster(1); @@ -138,13 +179,13 @@ c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC))); - BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC))); + BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); + BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC))); + BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } QPID_AUTO_TEST_CASE(testTxTransaction) { @@ -160,14 +201,14 @@ commitSession.txSelect(); commitSession.messageTransfer(arg::content=Message("a", "q")); commitSession.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A"); + BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); rollbackSession.messageTransfer(arg::content=Message("1", "q")); - Message rollbackMessage = rollbackSubs.get("q", TIME_SEC); + Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); @@ -191,10 +232,10 @@ // Verify queue status: just the comitted messages and dequeues should remain. BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c"); } QPID_AUTO_TEST_CASE(testUnacked) { @@ -210,7 +251,7 @@ c0.session.messageTransfer(arg::content=Message("11","q1")); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); - BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted + BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue // Create unacked message: not acquired, accepted or completeed. @@ -220,12 +261,12 @@ c0.session.messageTransfer(arg::content=Message("22","q2")); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); - m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue + m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(m.getData(), "21"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed c0.subs.getSubscription("q2").acquire(m); // Acquire manually BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed - BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. // Create empty credit record: acquire and accept but don't complete. @@ -235,7 +276,7 @@ c0.session.messageTransfer(arg::content=Message("32", "q3")); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); - Message m31=q3.get(TIME_SEC); + Message m31=q3.get(TIMEOUT); BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); @@ -251,7 +292,7 @@ // Complete the empty credit message, should unblock the message behind it. BOOST_CHECK_THROW(q3.get(0), Exception); c0.session.markCompleted(SequenceSet(m31.getId()), true); - BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32"); + BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); @@ -260,9 +301,9 @@ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); - BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22"); + BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22"); } QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { @@ -276,7 +317,7 @@ c0.session.messageTransfer(arg::content=Message("1","q")); c0.session.messageTransfer(arg::content=Message("2","q")); Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); // New member, TX not comitted, c1 should see nothing. @@ -287,7 +328,7 @@ // After commit c1 shoudl see results of tx. c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. @@ -295,7 +336,7 @@ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "3"); } @@ -318,7 +359,7 @@ // No reliable way to ensure the partial message has arrived // before we start the new broker, so we sleep. - ::usleep(2500); + sys::usleep(2500); cluster.add(); // Send final 2 frames of message. @@ -328,7 +369,7 @@ // Verify message is enqued correctly on second member. Message m; Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "abcd"); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size()); } @@ -391,20 +432,20 @@ // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "aaa"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); // Check second subscription's flow control: gets first message, not second. - BOOST_CHECK(lp.get(m, TIME_SEC)); + BOOST_CHECK(lp.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bbb"); BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "ccc"); // Kill the subscribing member, ensure further messages are not removed. @@ -412,7 +453,7 @@ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { c1.session.messageTransfer(arg::content=Message("xxx", "q")); - BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC)); + BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } @@ -426,7 +467,7 @@ c0.session.messageTransfer(arg::content=Message("foo","q")); c0.session.messageTransfer(arg::content=Message("bar","q")); while (c0.session.queueQuery("q").getMessageCount() != 2) - ::usleep(1000); // Wait for message to show up on broker 0. + sys::usleep(1000); // Wait for message to show up on broker 0. // Add a new broker, it should catch up. cluster.add(); @@ -444,18 +485,18 @@ Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); // Add another broker, don't wait for join - should be stalled till ready. cluster.add(); Client c2(cluster[2], "c2"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pbar"); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); } @@ -488,9 +529,9 @@ c0.session.close(); Client c1(cluster[1]); Message msg; - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("foo"), msg.getData()); - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } @@ -535,9 +576,9 @@ // Check they arrived Message m; - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("foo", m.getData()); - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("bar", m.getData()); // Queue should be empty on all cluster members. Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=742774&r1=742773&r2=742774&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Feb 9 22:25:26 2009 @@ -43,6 +43,10 @@ <control name="config-change" code="0x11" label="Raw cluster membership."> <field name="current" type="vbin16"/> <!-- packed member-id array --> </control> + + <control name="message-expired" code="0x12"> + <field name="id" type="uint64"/> + </control> <control name="shutdown" code="0x20" label="Shut down entire cluster"/> @@ -126,6 +130,7 @@ <control name="membership" code="0x21" label="Cluster membership details."> <field name="joiners" type="map"/> <!-- member-id -> URL --> <field name="members" type="map"/> <!-- member-id -> state --> + <field name="frame-id" type="uint64"/>> <!-- Frame id counter value --> </control> <!-- Set the position of a replicated queue. --> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org