Author: aconway
Date: Mon Feb  9 22:25:26 2009
New Revision: 742774

URL: http://svn.apache.org/viewvc?rev=742774&view=rev
Log:
Cluster support for message time-to-live.

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/examples/tradedemo/   (props changed)
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Propchange: qpid/trunk/qpid/cpp/examples/tradedemo/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Feb  9 22:25:26 2009
@@ -0,0 +1 @@
+Makefile.in

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Feb  9 22:25:26 2009
@@ -358,6 +358,8 @@
   qpid/broker/Broker.cpp \
   qpid/broker/BrokerSingleton.cpp \
   qpid/broker/Exchange.cpp \
+  qpid/broker/ExpiryPolicy.h \
+  qpid/broker/ExpiryPolicy.cpp \
   qpid/broker/Queue.cpp \
   qpid/broker/QueueCleaner.cpp \
   qpid/broker/QueueListeners.cpp \

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Feb  9 22:25:26 2009
@@ -63,6 +63,8 @@
   qpid/cluster/Event.h                         \
   qpid/cluster/EventFrame.h                    \
   qpid/cluster/EventFrame.cpp                  \
+  qpid/cluster/ExpiryPolicy.h                  \
+  qpid/cluster/ExpiryPolicy.cpp                        \
   qpid/cluster/FailoverExchange.cpp            \
   qpid/cluster/FailoverExchange.h              \
   qpid/cluster/Multicaster.cpp                 \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Feb  9 22:25:26 2009
@@ -30,6 +30,7 @@
 #include "SecureConnectionFactory.h"
 #include "TopicExchange.h"
 #include "Link.h"
+#include "ExpiryPolicy.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
@@ -150,6 +151,7 @@
     queueCleaner(queues, timer),
     queueEvents(poller),
     recovery(true),
+    expiryPolicy(new ExpiryPolicy),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if (conf.enableMgmt) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Feb  9 22:25:26 2009
@@ -36,6 +36,7 @@
 #include "Vhost.h"
 #include "System.h"
 #include "Timer.h"
+#include "ExpiryPolicy.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementBroker.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -65,6 +66,8 @@
 
 namespace broker {
 
+class ExpiryPolicy;
+
 static const  uint16_t DEFAULT_PORT=5672;
 
 struct NoSuchTransportException : qpid::Exception
@@ -111,6 +114,8 @@
   private:
     typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > 
ProtocolFactoryMap;
 
+    void declareStandardExchange(const std::string& name, const std::string& 
type);
+
     boost::shared_ptr<sys::Poller> poller;
     Options config;
     management::ManagementAgent::Singleton managementAgentSingleton;
@@ -132,14 +137,11 @@
     System::shared_ptr           systemObject;
     QueueCleaner queueCleaner;
     QueueEvents queueEvents;
-
-    void declareStandardExchange(const std::string& name, const std::string& 
type);
-
     std::vector<Url> knownBrokers;
     std::vector<Url> getKnownBrokersImpl();
     std::string federationTag;
-
     bool recovery;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
 
   public:
 
@@ -180,6 +182,9 @@
     Options& getOptions() { return config; }
     QueueEvents& getQueueEvents() { return queueEvents; }
 
+    void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { 
expiryPolicy = e; }
+    boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return 
expiryPolicy; }
+    
     SessionManager& getSessionManager() { return sessionManager; }
     const std::string& getFederationTag() const { return federationTag; }
 

Added: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp?rev=742774&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp Mon Feb  9 22:25:26 
2009
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ExpiryPolicy.h"
+#include "Message.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace broker {
+
+ExpiryPolicy::~ExpiryPolicy() {}
+
+void ExpiryPolicy::willExpire(Message&) {}
+
+bool ExpiryPolicy::hasExpired(Message& m) {
+    return m.getExpiration() < sys::AbsTime::now();
+}
+
+}} // namespace qpid::broker

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h?rev=742774&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h Mon Feb  9 22:25:26 2009
@@ -0,0 +1,44 @@
+#ifndef QPID_BROKER_EXPIRYPOLICY_H
+#define QPID_BROKER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+/**
+ * Default expiry policy.
+ */
+class ExpiryPolicy : public RefCounted
+{
+  public:
+    virtual ~ExpiryPolicy();
+    virtual void willExpire(Message&);
+    virtual bool hasExpired(Message&);
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_EXPIRYPOLICY_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Feb  9 22:25:26 2009
@@ -21,6 +21,7 @@
 
 #include "Message.h"
 #include "ExchangeRegistry.h"
+#include "ExpiryPolicy.h"
 #include "qpid/StringUtils.h"
 #include "qpid/framing/frame_functors.h"
 #include "qpid/framing/FieldTable.h"
@@ -316,24 +317,29 @@
     }
 }
 
-void Message::setTimestamp()
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) 
 {
     DeliveryProperties* props = getProperties<DeliveryProperties>();    
-    //Spec states that timestamp should be set, evaluate the
-    //performance impact before re-enabling this:
-    //time_t now = ::time(0);
-    //props->setTimestamp(now);
     if (props->getTtl()) {
-        //set expiration (nb: ttl is in millisecs, time_t is in secs)
+        // AMQP requires setting the expiration property to be posix
+        // time_t in seconds. TTL is in milliseconds
         time_t now = ::time(0);
         props->setExpiration(now + (props->getTtl()/1000));
+        // Use higher resolution time for the internal expiry calculation.
         expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * 
TIME_MSEC));
+        setExpiryPolicy(e);
     }
 }
 
-bool Message::hasExpired() const
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+    expiryPolicy = e;
+    if (expiryPolicy)
+        expiryPolicy->willExpire(*this);
+}
+
+bool Message::hasExpired()
 {
-    return expiration < FAR_FUTURE && expiration < AbsTime::now();
+    return expiryPolicy && expiryPolicy->hasExpired(*this);
 }
 
 boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* 
qfor) const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Feb  9 22:25:26 2009
@@ -45,6 +45,7 @@
 class ExchangeRegistry;
 class MessageStore;
 class Queue;
+class ExpiryPolicy;
 
 class Message : public PersistableMessage {
 public:
@@ -73,8 +74,11 @@
     const framing::FieldTable* getApplicationHeaders() const;
     bool isPersistent();
     bool requiresAccept();
-    void setTimestamp();
-    bool hasExpired() const;
+
+    void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+    void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
+    bool hasExpired();
+    sys::AbsTime getExpiration() const { return expiration; }
 
     framing::FrameSet& getFrames() { return frames; } 
     const framing::FrameSet& getFrames() const { return frames; } 
@@ -171,6 +175,7 @@
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
     qpid::sys::AbsTime expiration;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
 
     static TransferAdapter TRANSFER;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Feb  9 22:25:26 
2009
@@ -358,14 +358,13 @@
     std::string exchangeName = msg->getExchangeName();
     //TODO: the following should be hidden behind message (using 
MessageAdapter or similar)
 
-    // Do not replace the delivery-properties.exchange if it is is already set.
-    // This is used internally (by the cluster) to force the exchange name on 
a message.
-    // The client library ensures this is always empty for messages from 
normal clients.
     if (msg->isA<MessageTransferBody>()) {
-        if (!msg->hasProperties<DeliveryProperties>() ||
-            msg->getProperties<DeliveryProperties>()->getExchange().empty())
+        // Do not replace the delivery-properties.exchange if it is is already 
set.
+        // This is used internally (by the cluster) to force the exchange name 
on a message.
+        // The client library ensures this is always empty for messages from 
normal clients.
+        if (!msg->hasProperties<DeliveryProperties>() || 
msg->getProperties<DeliveryProperties>()->getExchange().empty())
             
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
-        msg->setTimestamp();
+        msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
     }
     if (!cacheExchange || cacheExchange->getName() != exchangeName){
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Feb  9 22:25:26 2009
@@ -76,6 +76,7 @@
     void ready(const std::string& url) { cluster.ready(member, url, l); }
     void configChange(const std::string& addresses) { 
cluster.configChange(member, addresses, l); }
     void updateOffer(uint64_t updatee, const Uuid& id) { 
cluster.updateOffer(member, updatee, id, l); }
+    void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
     void shutdown() { cluster.shutdown(member, l); }
 
     bool invoke(AMQBody& body) { return framing::invoke(*this, 
body).wasHandled(); }
@@ -103,6 +104,8 @@
                       poller),
     connections(*this),
     decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+    expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), 
mcast, myId, broker.getTimer())),
+    frameId(0),
     initialized(false),
     state(INIT),
     lastSize(0),
@@ -134,6 +137,7 @@
         myUrl = 
Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
     QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << 
myUrl);
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+    broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
     deliverEventQueue.start();
     deliverFrameQueue.start();
@@ -238,7 +242,8 @@
 
 // Handler for deliverFrameQueue
 void Cluster::deliveredFrame(const EventFrame& e) {
-    Mutex::ScopedLock l(lock); 
+    Mutex::ScopedLock l(lock);
+    const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
     QPID_LOG(trace, *this << " DLVR: " << e);
     QPID_LATENCY_RECORD("delivered frame queue", e.frame);
     if (e.isCluster()) {        // Cluster control frame
@@ -333,22 +338,23 @@
             state = JOINER;
             QPID_LOG(info, *this << " joining cluster: " << map);
             mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), 
myUrl.str()), myId);
-            ClusterMap::Set members = map.getAlive();
-            members.erase(myId);
-            myElders = members;
+            elders = map.getAlive();
+            elders.erase(myId);
             broker.getLinks().setPassive(true);
         }
     }
     else if (state >= READY && memberChange) {
         memberUpdate(l);
-        myElders = ClusterMap::intersection(myElders, map.getAlive());
-        if (myElders.empty()) {
+        elders = ClusterMap::intersection(elders, map.getAlive());
+        if (elders.empty()) {
             //assume we are oldest, reactive links if necessary
             broker.getLinks().setPassive(false);
         }
     }
 }
 
+bool Cluster::isLeader() const { return elders.empty(); }
+
 void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
@@ -420,15 +426,16 @@
     deliverFrameQueue.stop();
     if (updateThread.id()) updateThread.join(); // Join the previous 
updatethread.
     updateThread = Thread(
-        new UpdateClient(myId, updatee, url, broker, map, connections.values(),
-                       boost::bind(&Cluster::updateOutDone, this),
-                       boost::bind(&Cluster::updateOutError, this, _1)));
+        new UpdateClient(myId, updatee, url, broker, map, frameId, 
connections.values(),
+                         boost::bind(&Cluster::updateOutDone, this),
+                         boost::bind(&Cluster::updateOutError, this, _1)));
 }
 
 // Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
     Lock l(lock);
     updatedMap = m;
+    frameId = fid;
     checkUpdateIn(l);
 }
 
@@ -573,4 +580,8 @@
     QPID_LOG(debug, *this << " cluster-id = " << clusterId);
 }
 
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+    expiryPolicy->deliverExpire(id);
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Feb  9 22:25:26 2009
@@ -31,6 +31,7 @@
 #include "Quorum.h"
 #include "Decoder.h"
 #include "PollableQueue.h"
+#include "ExpiryPolicy.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/Monitor.h"
@@ -89,7 +90,7 @@
     void leave();
 
     // Update completed - called in update thread
-    void updateInDone(const ClusterMap&);
+    void updateInDone(const ClusterMap&, uint64_t frameId);
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
@@ -100,6 +101,8 @@
 
     size_t getReadMax() { return readMax; }
     size_t getWriteEstimate() { return writeEstimate; }
+
+    bool isLeader() const;       // Called in deliver thread.
     
   private:
     typedef sys::Monitor::ScopedLock Lock;
@@ -129,6 +132,7 @@
     void updateOffer(const MemberId& updater, uint64_t updatee, const 
framing::Uuid&, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
+    void messageExpired(const MemberId&, uint64_t, Lock& l);
     void shutdown(const MemberId&, Lock&);
     void deliveredEvent(const Event&); 
     void deliveredFrame(const EventFrame&); 
@@ -185,7 +189,6 @@
     const size_t writeEstimate;
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
-    ClusterMap::Set myElders;
     qpid::management::ManagementAgent* mAgent;
 
     // Thread safe members
@@ -197,8 +200,11 @@
     boost::shared_ptr<FailoverExchange> failoverExchange;
     Quorum quorum;
 
-    // Called only from event delivery thread
+    // Used only in delivery thread
     Decoder decoder;
+    ClusterMap::Set elders;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    uint64_t frameId;
 
     // Used only during initialization
     bool initialized;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Feb  9 22:25:26 
2009
@@ -54,7 +54,7 @@
     bool quorum;
     size_t readMax, writeEstimate;
 
-    ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {}
+    ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
   
     Url getUrl(uint16_t port) const {
         if (url.empty()) return Url::getIpAddressesUrl(port);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Feb  9 22:25:26 2009
@@ -127,10 +127,6 @@
           case DTX_CLASS_ID: message = "DTX transactions are not currently 
supported by cluster."; break;
         }
     }
-    else if (body.type() == HEADER_BODY) {
-        const DeliveryProperties* dp = static_cast<const 
AMQHeaderBody&>(body).get<DeliveryProperties>();
-        if (dp && dp->getTtl()) message = "Message TTL is not currently 
supported by cluster.";
-    }
     if (!message.empty())
         connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
     return !message.empty();
@@ -259,9 +255,9 @@
     self = shadow;
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& 
members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& 
members, uint64_t frameId) {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << 
*this);
-    cluster.updateInDone(ClusterMap(joiners, members));
+    cluster.updateInDone(ClusterMap(joiners, members), frameId);
     self.second = 0;        // Mark this as completed update connection.
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Feb  9 22:25:26 2009
@@ -119,7 +119,7 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&);
+    void membership(const framing::FieldTable&, const framing::FieldTable&, 
uint64_t frameId);
 
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=742774&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Mon Feb  9 22:25:26 
2009
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExpiryPolicy.h"
+#include "Multicaster.h"
+#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/sys/Time.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Timer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, 
const MemberId& id, broker::Timer& t)
+    : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), 
timer(t) {}
+
+namespace {
+uint64_t clusterId(const broker::Message& m) {
+    assert(m.getFrames().begin() != m.getFrames().end());
+    return m.getFrames().begin()->getClusterId();
+}
+
+struct ExpiryTask : public broker::TimerTask {
+    ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, 
sys::AbsTime when)
+        : TimerTask(when), expiryPolicy(policy), messageId(id) {}
+    void fire() { expiryPolicy->sendExpire(messageId); }
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    const uint64_t messageId;
+};
+}
+
+void ExpiryPolicy::willExpire(broker::Message& m) {
+    timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+}
+
+bool ExpiryPolicy::hasExpired(broker::Message& m) {
+    sys::Mutex::ScopedLock l(lock);
+    IdSet::iterator i = expired.find(clusterId(m));
+    if (i != expired.end()) {
+        expired.erase(i);
+        const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // 
hasExpired() == true; 
+        return true;
+    }
+    return false;
+}
+
+void ExpiryPolicy::sendExpire(uint64_t id) {
+    sys::Mutex::ScopedLock l(lock);
+    if (isLeader()) 
+        
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(),
 id), memberId);
+}
+
+void ExpiryPolicy::deliverExpire(uint64_t id) {
+    sys::Mutex::ScopedLock l(lock);
+    expired.insert(id);
+}
+
+bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
+void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=742774&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Mon Feb  9 22:25:26 2009
@@ -0,0 +1,76 @@
+#ifndef QPID_CLUSTER_EXPIRYPOLICY_H
+#define QPID_CLUSTER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <set>
+
+namespace qpid {
+
+namespace broker { class Timer; }
+
+namespace cluster {
+class Multicaster;
+
+/**
+ * Cluster expiry policy
+ */
+class ExpiryPolicy : public broker::ExpiryPolicy
+{
+  public:
+    ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const 
MemberId&, broker::Timer&);
+
+    void willExpire(broker::Message&);
+
+    bool hasExpired(broker::Message&);
+
+    // Send expiration notice to cluster.
+    void sendExpire(uint64_t);
+
+    // Cluster delivers expiry notice.
+    void deliverExpire(uint64_t);
+
+  private:
+    sys::Mutex lock;
+    typedef std::set<uint64_t> IdSet;
+
+    struct Expired : public broker::ExpiryPolicy {
+        bool hasExpired(broker::Message&);
+        void willExpire(broker::Message&);
+    };
+
+    IdSet expired;
+    boost::intrusive_ptr<Expired> expiredPolicy;
+    boost::function<bool()> isLeader;
+    Multicaster& mcast;
+    MemberId memberId;
+    broker::Timer& timer;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXPIRYPOLICY_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Feb  9 22:25:26 
2009
@@ -86,10 +86,12 @@
 // TODO aconway 2008-09-24: optimization: update connections/sessions in 
parallel.
 
 UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, 
const Url& url,
-                       broker::Broker& broker, const ClusterMap& m, const 
Cluster::Connections& cons,
-                       const boost::function<void()>& ok,
-                       const boost::function<void(const std::exception&)>& 
fail)
-    : updaterId(updater), updateeId(updatee), updateeUrl(url), 
updaterBroker(broker), map(m), connections(cons), 
+                           broker::Broker& broker, const ClusterMap& m, 
uint64_t frameId_,
+                           const Cluster::Connections& cons,
+                           const boost::function<void()>& ok,
+                           const boost::function<void(const std::exception&)>& 
fail)
+    : updaterId(updater), updateeId(updatee), updateeUrl(url), 
updaterBroker(broker), map(m),
+      frameId(frameId_), connections(cons), 
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail)
 {
@@ -120,6 +122,7 @@
 
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
+    membership.setFrameId(frameId);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
     connection.close();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Feb  9 22:25:26 2009
@@ -63,9 +63,10 @@
     static const std::string UPDATE; // Name for special update queue and 
exchange.
     
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
-               broker::Broker& donor, const ClusterMap& map, const 
std::vector<boost::intrusive_ptr<Connection> >& ,
-               const boost::function<void()>& done,
-               const boost::function<void(const std::exception&)>& fail);
+                 broker::Broker& donor, const ClusterMap& map, uint64_t 
sequence,
+                 const std::vector<boost::intrusive_ptr<Connection> >& ,
+                 const boost::function<void()>& done,
+                 const boost::function<void(const std::exception&)>& fail);
 
     ~UpdateClient();
     void update();
@@ -89,6 +90,7 @@
     Url updateeUrl;
     broker::Broker& updaterBroker;
     ClusterMap map;
+    uint64_t frameId;
     std::vector<boost::intrusive_ptr<Connection> > connections;
     client::Connection connection, shadowConnection;
     client::AsyncSession session, shadowSession;

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Mon Feb  9 22:25:26 2009
@@ -92,6 +92,9 @@
     /** Must point to at least DECODE_SIZE_MIN bytes of data */
     static uint16_t decodeSize(char* data);
 
+    uint64_t getClusterId() const { return clusterId; }
+    void setClusterId(uint64_t id) { clusterId = id; }
+    
   private:
     void init();
 
@@ -103,6 +106,7 @@
     bool bos : 1;
     bool eos : 1;
     mutable uint32_t encodedSizeCache;
+    uint64_t clusterId;         // Used to identify frames in a clustered 
broekr.
 };
 
 std::ostream& operator<<(std::ostream&, const AMQFrame&);

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Mon Feb  9 22:25:26 2009
@@ -49,6 +49,7 @@
     bool isComplete() const;
 
     uint64_t getContentSize() const;
+
     void getContent(std::string&) const;
     std::string getContent() const;
 
@@ -73,6 +74,9 @@
         return header ? header->get<T>() : 0;
     }
 
+    Frames::const_iterator begin() const { return parts.begin(); }
+    Frames::const_iterator end() const { return parts.end(); }
+    
     const SequenceNumber& getId() const { return id; }
 
     template <class P> void remove(P predicate) {

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon Feb  9 22:25:26 2009
@@ -26,6 +26,7 @@
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/client/QueueOptions.h"
 #include <iostream>
@@ -491,7 +492,7 @@
         } else {
             if (evenTtl) 
m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
         }
-        m->setTimestamp();
+        m->setTimestamp(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Feb  9 22:25:26 2009
@@ -37,6 +37,7 @@
 
 #include <boost/bind.hpp>
 #include <boost/shared_ptr.hpp>
+#include <boost/assign.hpp>
 
 #include <string>
 #include <iostream>
@@ -51,22 +52,23 @@
 ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); 
}
 }
 
-
-QPID_AUTO_TEST_SUITE(cluster)
+QPID_AUTO_TEST_SUITE(cluster_test)
 
 using namespace std;
 using namespace qpid;
 using namespace qpid::cluster;
 using namespace qpid::framing;
 using namespace qpid::client;
-using qpid::sys::TIME_SEC;
-using qpid::broker::Broker;
+using namespace boost::assign;
+using broker::Broker;
 using boost::shared_ptr;
-using qpid::cluster::Cluster;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
 
 
 ostream& operator<<(ostream& o, const cpg_name* n) {
-    return o << qpid::cluster::Cpg::str(*n);
+    return o << cluster::Cpg::str(*n);
 }
 
 ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -94,7 +96,7 @@
         BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
         // Retry up to 10 secs in .1 second intervals.
         for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; 
--retry) {
-            ::usleep(1000*100); // 0.1 secs
+            sys::usleep(1000*100); // 0.1 secs
             urls = source.getKnownBrokers();
         }
     }
@@ -127,6 +129,45 @@
     return 
m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
 }
 
+Message ttlMessage(const std::string& data, const std::string& key, uint64_t 
ttl) {
+    Message m(data, key);
+    m.getDeliveryProperties().setTtl(ttl);
+    return m;
+}
+
+vector<std::string> browse(Client& c, const std::string& q, int n) {
+    SubscriptionSettings browseSettings(
+        FlowControl::unlimited(),
+        ACCEPT_MODE_NONE,
+        ACQUIRE_MODE_NOT_ACQUIRED,
+        0                       // No auto-ack.
+    );
+    LocalQueue lq;
+    c.subs.subscribe(lq, q, browseSettings);
+    vector<std::string> result;
+    for (int i = 0; i < n; ++i) {
+        result.push_back(lq.get(TIMEOUT).getData());
+    }
+    c.subs.getSubscription(q).cancel();
+    return result;
+}
+
+QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
+    // Note: this doesn't actually test for cluster race conditions around TTL,
+    // it just verifies that basic TTL functionality works.
+    //
+    ClusterFixture cluster(2);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    c0.session.queueDeclare("q");
+    c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
+    c0.session.messageTransfer(arg::content=Message("b", "q"));
+    BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<std::string>("a")("b"));
+    sys::usleep(300*1000); 
+    BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<std::string>("b"));
+    BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<std::string>("b"));
+}
+
 QPID_AUTO_TEST_CASE(testSequenceOptions) {
     // Make sure the exchange qpid.msg_sequence property is properly 
replicated.
     ClusterFixture cluster(1);
@@ -138,13 +179,13 @@
     c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", 
arg::bindingKey="k");
     c0.session.messageTransfer(arg::content=Message("1", "k"), 
arg::destination="ex");
     c0.session.messageTransfer(arg::content=Message("2", "k"), 
arg::destination="ex");
-    BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
-    BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+    BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+    BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
 
     cluster.add();
     Client c1(cluster[1]);
     c1.session.messageTransfer(arg::content=Message("3", "k"), 
arg::destination="ex");    
-    BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
+    BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
 }
 
 QPID_AUTO_TEST_CASE(testTxTransaction) {
@@ -160,14 +201,14 @@
     commitSession.txSelect();
     commitSession.messageTransfer(arg::content=Message("a", "q"));
     commitSession.messageTransfer(arg::content=Message("b", "q"));
-    BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A");
+    BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
 
     // Start a transaction that will roll back.
     Session rollbackSession = c0.connection.newSession("rollback");
     SubscriptionManager rollbackSubs(rollbackSession);
     rollbackSession.txSelect();
     rollbackSession.messageTransfer(arg::content=Message("1", "q"));
-    Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
+    Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
     BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
 
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
@@ -191,10 +232,10 @@
 
     // Verify queue status: just the comitted messages and dequeues should 
remain.
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
-    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
-    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
-    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b");
-    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
 }
 
 QPID_AUTO_TEST_CASE(testUnacked) {
@@ -210,7 +251,7 @@
     c0.session.messageTransfer(arg::content=Message("11","q1"));
     LocalQueue q1;
     c0.subs.subscribe(q1, "q1", manualAccept);
-    BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not 
accepted
+    BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not 
accepted
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // 
Gone from queue
 
     // Create unacked message: not acquired, accepted or completeed.
@@ -220,12 +261,12 @@
     c0.session.messageTransfer(arg::content=Message("22","q2"));
     LocalQueue q2;
     c0.subs.subscribe(q2, "q2", manualAcquire);
-    m = q2.get(TIME_SEC);  // Not acquired or accepted, still on queue
+    m = q2.get(TIMEOUT);  // Not acquired or accepted, still on queue
     BOOST_CHECK_EQUAL(m.getData(), "21");
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // 
Not removed
     c0.subs.getSubscription("q2").acquire(m); // Acquire manually
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 
Removed
-    BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or 
accepted, still on queue
+    BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or 
accepted, still on queue
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 
not acquired.
 
     // Create empty credit record: acquire and accept but don't complete.
@@ -235,7 +276,7 @@
     c0.session.messageTransfer(arg::content=Message("32", "q3"));
     LocalQueue q3;
     c0.subs.subscribe(q3, "q3", manualComplete);
-    Message m31=q3.get(TIME_SEC);
+    Message m31=q3.get(TIMEOUT);
     BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & 
accepted but not completed.
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);    
 
@@ -251,7 +292,7 @@
     // Complete the empty credit message, should unblock the message behind it.
     BOOST_CHECK_THROW(q3.get(0), Exception);
     c0.session.markCompleted(SequenceSet(m31.getId()), true);
-    BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32");
+    BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32");
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
     
@@ -260,9 +301,9 @@
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
 
-    BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11");
-    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21");
-    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22");
+    BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11");
+    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21");
+    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22");
 }
 
 QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
@@ -276,7 +317,7 @@
     c0.session.messageTransfer(arg::content=Message("1","q"));
     c0.session.messageTransfer(arg::content=Message("2","q"));
     Message m;
-    BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "1");
 
     // New member, TX not comitted, c1 should see nothing.
@@ -287,7 +328,7 @@
     // After commit c1 shoudl see results of tx.
     c0.session.txCommit();
     BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 
1u);
-    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "2");
 
     // Another transaction with both members active.
@@ -295,7 +336,7 @@
     BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 
0u);
     c0.session.txCommit();
     BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 
1u);
-    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "3");
 }
 
@@ -318,7 +359,7 @@
 
     // No reliable way to ensure the partial message has arrived
     // before we start the new broker, so we sleep.
-    ::usleep(2500); 
+    sys::usleep(2500); 
     cluster.add();
 
     // Send final 2 frames of message.
@@ -328,7 +369,7 @@
     // Verify message is enqued correctly on second member.
     Message m;
     Client c1(cluster[1], "c1");
-    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "abcd");
     BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
 }
@@ -391,20 +432,20 @@
     // Activate the subscription, ensure message removed on all queues. 
     c0.subs.setFlowControl("q", FlowControl::unlimited());
     Message m;
-    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+    BOOST_CHECK(c0.lq.get(m, TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "aaa");
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
 
     // Check second subscription's flow control: gets first message, not 
second.
-    BOOST_CHECK(lp.get(m, TIME_SEC));
+    BOOST_CHECK(lp.get(m, TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "bbb");
     BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
 
-    BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "ccc");
     
     // Kill the subscribing member, ensure further messages are not removed.
@@ -412,7 +453,7 @@
     BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
     for (int i = 0; i < 10; ++i) {
         c1.session.messageTransfer(arg::content=Message("xxx", "q"));
-        BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC));
+        BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
         BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
     }
 }
@@ -426,7 +467,7 @@
     c0.session.messageTransfer(arg::content=Message("foo","q"));
     c0.session.messageTransfer(arg::content=Message("bar","q"));
     while (c0.session.queueQuery("q").getMessageCount() != 2)
-        ::usleep(1000);    // Wait for message to show up on broker 0.
+        sys::usleep(1000);    // Wait for message to show up on broker 0.
 
     // Add a new broker, it should catch up.
     cluster.add();
@@ -444,18 +485,18 @@
 
     Client c1(cluster[1], "c1");
 
-    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "foo");
-    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "bar");
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
 
     // Add another broker, don't wait for join - should be stalled till ready.
     cluster.add();
     Client c2(cluster[2], "c2");
-    BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "pfoo");
-    BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "pbar");
     BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
 }
@@ -488,9 +529,9 @@
     c0.session.close();
     Client c1(cluster[1]);
     Message msg;
-    BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+    BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(string("foo"), msg.getData());
-    BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+    BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(string("bar"), msg.getData());
 }
 
@@ -535,9 +576,9 @@
 
     // Check they arrived
     Message m;
-    BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+    BOOST_CHECK(c0.lq.get(m, TIMEOUT));
     BOOST_CHECK_EQUAL("foo", m.getData());
-    BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+    BOOST_CHECK(c0.lq.get(m, TIMEOUT));
     BOOST_CHECK_EQUAL("bar", m.getData());
 
     // Queue should be empty on all cluster members.

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=742774&r1=742773&r2=742774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Feb  9 22:25:26 2009
@@ -43,6 +43,10 @@
     <control name="config-change" code="0x11" label="Raw cluster membership.">
       <field name="current" type="vbin16"/> <!-- packed member-id array -->
     </control>
+
+    <control name="message-expired" code="0x12">
+      <field name="id" type="uint64"/>
+    </control>
     
     <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
 
@@ -126,6 +130,7 @@
     <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
+      <field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
     </control>
 
     <!-- Set the position of a replicated queue. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to