Author: aconway
Date: Mon Mar 9 17:03:40 2009
New Revision: 751760
URL: http://svn.apache.org/viewvc?rev=751760&view=rev
Log:
Fix cluster TTL: replicte expiry information to newcomers.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
qpid/trunk/qpid/cpp/src/tests/start_cluster
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Mar 9 17:03:40 2009
@@ -111,7 +111,6 @@
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
- frameId(0),
lastSize(0),
lastBroker(false)
{
@@ -267,9 +266,6 @@
}
else if (state >= CATCHUP) {
QPID_LOG(trace, *this << " DLVR: " << e);
- EventFrame ef(e); // Non-const copy
- if (ef.type == DATA) // Add cluster-id to to data frames.
- ef.frame.setClusterId(frameId++);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -475,18 +471,16 @@
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, frameId,
getConnections(l), decoder,
+ new UpdateClient(self, updatee, url, broker, map, *expiryPolicy,
getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) {
+void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
updatedMap = m;
- // Safe to set frameId here because we are stalled: deliveredFrame cannot
be called concurrently.
- frameId = frameId_;
checkUpdateIn(l);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Mar 9 17:03:40 2009
@@ -92,7 +92,7 @@
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&, uint64_t frameId);
+ void updateInDone(const ClusterMap&);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -108,6 +108,8 @@
// Called only during update by Connection::shadowReady
Decoder& getDecoder() { return decoder; }
+ ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
+
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -115,8 +117,6 @@
typedef PollableQueue<EventFrame> PollableFrameQueue;
typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
- // FIXME aconway 2009-03-07: sort functions by thread
-
// NB: A dummy Lock& parameter marks functions that must only be
// called with Cluster::lock locked.
@@ -237,7 +237,6 @@
} state;
ConnectionMap connections;
- uint64_t frameId;
ClusterMap map;
ClusterMap::Set elders;
size_t lastSize;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Mar 9 17:03:40 2009
@@ -279,9 +279,9 @@
cluster.getDecoder().get(self).setFragment(fragment.data(),
fragment.size());
}
-void Connection::membership(const FieldTable& joiners, const FieldTable&
members, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable&
members) {
QPID_LOG(debug, cluster << " incoming update complete on connection " <<
*this);
- cluster.updateInDone(ClusterMap(joiners, members), frameId);
+ cluster.updateInDone(ClusterMap(joiners, members));
self.second = 0; // Mark this as completed update connection.
}
@@ -352,6 +352,10 @@
q->setPosition(position);
}
+void Connection::expiryId(uint64_t id) {
+ cluster.getExpiryPolicy().setId(id);
+}
+
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Mar 9 17:03:40 2009
@@ -120,7 +120,7 @@
void shadowReady(uint64_t memberId, uint64_t connectionId, const
std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&,
uint64_t frameId);
+ void membership(const framing::FieldTable&, const framing::FieldTable&);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -135,6 +135,7 @@
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void expiryId(uint64_t);
void txStart();
void txAccept(const framing::SequenceSet&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Mon Mar 9 17:03:40 2009
@@ -33,7 +33,9 @@
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.frame << " " << e.type << " " << e.connectionId << "
read-credit=" << e.readCredit;
+ return o << e.frame << " " << e.type << " " << e.connectionId;
+ if (e.readCredit) o << " read-credit=" << e.readCredit;
+ return o;
}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Mon Mar 9 17:03:40
2009
@@ -31,46 +31,45 @@
namespace cluster {
ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer&
t)
- : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
-
-namespace {
-uint64_t clusterId(const broker::Message& m) {
- assert(m.getFrames().begin() != m.getFrames().end());
- return m.getFrames().begin()->getClusterId();
-}
+ : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id),
timer(t) {}
struct ExpiryTask : public broker::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id,
sys::AbsTime when)
- : TimerTask(when), expiryPolicy(policy), messageId(id) {}
- void fire() { expiryPolicy->sendExpire(messageId); }
+ : TimerTask(when), expiryPolicy(policy), expiryId(id) {}
+ void fire() { expiryPolicy->sendExpire(expiryId); }
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- const uint64_t messageId;
+ const uint64_t expiryId;
};
-}
void ExpiryPolicy::willExpire(broker::Message& m) {
- timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+ uint64_t id = expiryId++;
+ assert(unexpiredById.find(id) == unexpiredById.end());
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById[id] = &m;
+ unexpiredByMessage[&m] = id;
+ timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- IdSet::iterator i = expired.find(clusterId(m));
- if (i != expired.end()) {
- expired.erase(i);
- const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); //
hasExpired() == true;
- return true;
- }
- return false;
+ return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
void ExpiryPolicy::sendExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(),
id), memberId);
}
void ExpiryPolicy::deliverExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- expired.insert(id);
+ IdMessageMap::iterator i = unexpiredById.find(id);
+ if (i != unexpiredById.end()) {
+ i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ unexpiredByMessage.erase(i->second);
+ unexpiredById.erase(i);
+ }
+}
+
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() :
i->second;
}
bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Mon Mar 9 17:03:40 2009
@@ -27,11 +27,15 @@
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
-#include <set>
+#include <boost/optional.hpp>
+#include <map>
namespace qpid {
-namespace broker { class Timer; }
+namespace broker {
+class Timer;
+class Message;
+}
namespace cluster {
class Multicaster;
@@ -54,16 +58,23 @@
// Cluster delivers expiry notice.
void deliverExpire(uint64_t);
+ void setId(uint64_t id) { expiryId = id; }
+ uint64_t getId() const { return expiryId; }
+
+ boost::optional<uint64_t> getId(broker::Message&);
+
private:
- sys::Mutex lock;
- typedef std::set<uint64_t> IdSet;
+ typedef std::map<broker::Message*, uint64_t> MessageIdMap;
+ typedef std::map<uint64_t, broker::Message*> IdMessageMap;
struct Expired : public broker::ExpiryPolicy {
bool hasExpired(broker::Message&);
void willExpire(broker::Message&);
};
- IdSet expired;
+ MessageIdMap unexpiredByMessage;
+ IdMessageMap unexpiredById;
+ uint64_t expiryId;
boost::intrusive_ptr<Expired> expiredPolicy;
Multicaster& mcast;
MemberId memberId;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Mar 9 17:03:40
2009
@@ -23,6 +23,7 @@
#include "ClusterMap.h"
#include "Connection.h"
#include "Decoder.h"
+#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/broker/Broker.h"
@@ -87,14 +88,14 @@
// TODO aconway 2008-09-24: optimization: update connections/sessions in
parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee,
const Url& url,
- broker::Broker& broker, const ClusterMap& m,
uint64_t frameId_,
+ broker::Broker& broker, const ClusterMap& m,
ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder&
decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>&
fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url),
updaterBroker(broker), map(m),
- frameId(frameId_), connections(cons), decoder(decoder_),
+ expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{
@@ -129,9 +130,9 @@
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
+ ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
- membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
@@ -150,8 +151,7 @@
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
- ClusterConnectionProxy proxy(session);
- proxy.exchange(encode(*ex));
+ ClusterConnectionProxy(session).exchange(encode(*ex));
}
/** Bind a queue to the update exchange and update messges to it
@@ -162,10 +162,11 @@
bool haveLastPos;
framing::SequenceNumber lastPos;
client::AsyncSession session;
-
+ ExpiryPolicy& expiry;
+
public:
- MessageUpdater(const string& q, const client::AsyncSession s) : queue(q),
haveLastPos(false), session(s) {
+ MessageUpdater(const string& q, const client::AsyncSession s,
ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s),
expiry(expiry_) {
session.exchangeBind(queue, UpdateClient::UPDATE);
}
@@ -181,11 +182,20 @@
void updateQueuedMessage(const broker::QueuedMessage& message) {
+ // Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue,
message.position.getValue()-1);
haveLastPos = true;
}
lastPos = message.position;
+
+ // Send the expiry ID if necessary.
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+ boost::optional<uint64_t> expiryId =
expiry.getId(*message.payload);
+ if (!expiryId) return; // Message already expired, don't replicate.
+ ClusterConnectionProxy(session).expiryId(*expiryId);
+ }
+
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE,
message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
@@ -214,7 +224,7 @@
QPID_LOG(debug, updaterId << " updating queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
- MessageUpdater updater(q->getName(), session);
+ MessageUpdater updater(q->getName(), session, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater,
_1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this,
q->getName(), _1));
}
@@ -323,7 +333,7 @@
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE,
shadowSession).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, shadowSession,
expiry).updateQueuedMessage(dr.getMessage());
}
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
@@ -342,8 +352,8 @@
class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
public:
- TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
- : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s),
proxy(s) {}
+ TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
+ : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc),
session(s), proxy(s) {}
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported
by cluster.");
@@ -386,7 +396,7 @@
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
if (txBuffer) {
proxy.txStart();
- TxOpUpdater updater(*this, shadowSession);
+ TxOpUpdater updater(*this, shadowSession, expiry);
txBuffer->accept(updater);
proxy.txEnd();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Mar 9 17:03:40 2009
@@ -56,6 +56,7 @@
class Connection;
class ClusterMap;
class Decoder;
+class ExpiryPolicy;
/**
* A client that updates the contents of a local broker to a remote one using
AMQP.
@@ -65,7 +66,7 @@
static const std::string UPDATE; // Name for special update queue and
exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, uint64_t
frameId,
+ broker::Broker& donor, const ClusterMap& map, ExpiryPolicy&
expiry,
const std::vector<boost::intrusive_ptr<Connection> >&,
Decoder&,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
@@ -94,7 +95,7 @@
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
- uint64_t frameId;
+ ExpiryPolicy& expiry;
std::vector<boost::intrusive_ptr<Connection> > connections;
Decoder& decoder;
client::Connection connection, shadowConnection;
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Mon Mar 9 17:03:40 2009
@@ -35,7 +35,6 @@
subchannel=0;
channel=0;
encodedSizeCache = 0;
- clusterId = 0;
}
AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Mon Mar 9 17:03:40 2009
@@ -92,9 +92,6 @@
/** Must point to at least DECODE_SIZE_MIN bytes of data */
static uint16_t decodeSize(char* data);
- uint64_t getClusterId() const { return clusterId; }
- void setClusterId(uint64_t id) { clusterId = id; }
-
private:
void init();
@@ -106,7 +103,6 @@
bool bos : 1;
bool eos : 1;
mutable uint32_t encodedSizeCache;
- uint64_t clusterId; // Used to identify frames in a clustered
broekr.
};
std::ostream& operator<<(std::ostream&, const AMQFrame&);
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Mar 9 17:03:40 2009
@@ -151,7 +151,10 @@
c.subs.subscribe(lq, q, browseSettings);
vector<string> result;
for (int i = 0; i < n; ++i) {
- result.push_back(lq.get(TIMEOUT).getData());
+ Message m;
+ if (!lq.get(m, TIMEOUT))
+ break;
+ result.push_back(m.getData());
}
c.subs.getSubscription(q).cancel();
return result;
@@ -202,13 +205,23 @@
ClusterFixture cluster(2);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
+ c0.session.queueDeclare("p");
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
c0.session.messageTransfer(arg::content=Message("b", "q"));
- BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b"));
- sys::usleep(300*1000);
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000));
+ c0.session.messageTransfer(arg::content=Message("y", "p"));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+
+ BOOST_CHECK_EQUAL(browse(c0, "p", 2), list_of<string>("x")("y"));
+ BOOST_CHECK_EQUAL(browse(c1, "p", 2), list_of<string>("x")("y"));
+ BOOST_CHECK_EQUAL(browse(c2, "p", 2), list_of<string>("x")("y"));
+
+ sys::usleep(200*1000);
BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b"));
BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b"));
+ BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b"));
}
QPID_AUTO_TEST_CASE(testSequenceOptions) {
Modified: qpid/trunk/qpid/cpp/src/tests/start_cluster
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ qpid/trunk/qpid/cpp/src/tests/start_cluster Mon Mar 9 17:03:40 2009
@@ -28,7 +28,6 @@
echo $* | newgrp ais
}
-test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
rm -f cluster*.log
SIZE=${1:-1}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=751760&r1=751759&r2=751760&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Mar 9 17:03:40 2009
@@ -132,7 +132,6 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
- <field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
</control>
<!-- Set the position of a replicated queue. -->
@@ -140,11 +139,12 @@
<field name="queue" type="str8"/>
<field name="position" type="sequence-no"/>
</control>
-
+
<!-- Replicate encoded exchanges/queues. -->
<control name="exchange" code="0x31"><field name="encoded"
type="str32"/></control>
<control name="queue" code="0x32"><field name="encoded"
type="str32"/></control>
-
+ <!-- Set expiry-id for subsequent messages. -->
+ <control name="expiry-id" code="0x33"><field name="expiry-id"
type="uint64"/></control>
</class>
</amqp>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]