Author: aconway Date: Thu Mar 5 13:28:14 2009 New Revision: 750456 URL: http://svn.apache.org/viewvc?rev=750456&view=rev Log: cluster: fix delivery-property.exchange-name set on updated messages.
Logging improvements, useful for debugging: - qpid/SessionState.cpp: show frame bodies with command IDs. - assign cluster-wide id number to each Event. Added: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (with props) Modified: qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Thu Mar 5 13:28:14 2009 @@ -64,6 +64,7 @@ qpid/cluster/ExpiryPolicy.cpp \ qpid/cluster/FailoverExchange.cpp \ qpid/cluster/FailoverExchange.h \ + qpid/cluster/UpdateExchange.h \ qpid/cluster/Multicaster.cpp \ qpid/cluster/Multicaster.h \ qpid/cluster/McastFrameHandler.h \ Modified: qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Thu Mar 5 13:28:14 2009 @@ -113,7 +113,8 @@ void SessionState::senderRecord(const AMQFrame& f) { if (isControl(f)) return; // Ignore control frames. - QPID_LOG_IF(debug, f.getMethod(), getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getMethod()); + QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": " << *f.getBody()); + stateful = true; if (timeout) sender.replayList.push_back(f); sender.unflushedSize += f.encodedSize(); @@ -193,8 +194,8 @@ receiver.received = receiver.expected; receiver.incomplete += receiverGetCurrent(); } - QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod()); - QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f); + QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getBody()); + if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame."); return firstTime; } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Mar 5 13:28:14 2009 @@ -275,3 +275,7 @@ { return b->queue == queue; } + +void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { + msg->getProperties<DeliveryProperties>()->setExchange(getName()); +} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Mar 5 13:28:14 2009 @@ -139,8 +139,9 @@ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; + virtual void setProperties(const boost::intrusive_ptr<Message>&); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - + //PersistableExchange: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Mar 5 13:28:14 2009 @@ -355,20 +355,12 @@ } void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { + msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); + std::string exchangeName = msg->getExchangeName(); - //TODO: the following should be hidden behind message (using MessageAdapter or similar) - - if (msg->isA<MessageTransferBody>()) { - // Do not replace the delivery-properties.exchange if it is is already set. - // This is used internally (by the cluster) to force the exchange name on a message. - // The client library ensures this is always empty for messages from normal clients. - if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty()) - msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); - msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - } - if (!cacheExchange || cacheExchange->getName() != exchangeName){ + if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); - } + cacheExchange->setProperties(msg); /* verify the userid if specified: */ std::string id = Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Mar 5 13:28:14 2009 @@ -21,6 +21,7 @@ #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" +#include "UpdateExchange.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -106,13 +107,13 @@ "Error delivering frames", poller), expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())), + eventId(0), frameId(0), initialized(false), state(INIT), connections(*this), lastSize(0), - lastBroker(false), - sequence(0) + lastBroker(false) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -122,7 +123,13 @@ mgmtObject->set_status("JOINING"); } + // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); + broker.getExchanges().registerExchange(failoverExchange); + + // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. @@ -212,7 +219,6 @@ MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - e.setSequence(sequence++); if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e); @@ -225,34 +231,40 @@ } // Handler for deliverEventQueue -void Cluster::deliveredEvent(const Event& e) { - QPID_LATENCY_RECORD("delivered event queue", e); +void Cluster::deliveredEvent(const Event& event) { + Event e(event); Mutex::ScopedLock l(lock); + if (state >= CATCHUP) { + e.setId(++eventId); + QPID_LOG(trace, *this << " DLVR: " << e); + } if (e.isCluster()) { // Cluster control, process in this thread. - AMQFrame frame(e.getFrame()); + EventFrame ef(e, e.getFrame()); + QPID_LOG(trace, *this << " DLVR: " << ef); ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); - if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) + if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } - else if (state >= CATCHUP) { // Connection frame, push onto deliver queue. - if (e.getType() == CONTROL) + else if (state >= CATCHUP) { // Handle connection frames + if (e.getType() == CONTROL) { connectionFrame(EventFrame(e, e.getFrame())); + } else connections.decode(e, e.getData()); } - else // connection frame && state < CATCHUP. Drop. - QPID_LOG(trace, *this << " DROP: " << e); + // Drop connection frames while state < CATCHUP } // Handler for deliverFrameQueue -void Cluster::deliveredFrame(const EventFrame& e) { +void Cluster::deliveredFrame(const EventFrame& event) { Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock? + EventFrame e(event); assert(!e.isCluster()); // Only connection frames on this queue. - QPID_LOG(trace, *this << " DLVR: " << e); - if (e.type == DATA) // Sequence number to identify data frames. - const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + QPID_LOG(trace, *this << " DLVR: " << e); + if (e.type == DATA) // Add cluster-id to to data frames. + e.frame.setClusterId(frameId++); boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. + if (connection) // Ignore frames to closed local connections. connection->deliveredFrame(e); } @@ -389,6 +401,10 @@ // Stop processing the deliveredEventQueue in order to send or // recieve an update. deliverEventQueue.stop(); + + // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must + // also wait for it to be empty before we are stalled, so that + // our local model is up-to-date to give an update. } void Cluster::unstall(Lock&) { @@ -434,17 +450,18 @@ cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(), + new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) { Lock l(lock); updatedMap = m; - frameId = fid; + eventId = eventId_; + frameId = frameId_; checkUpdateIn(l); } @@ -601,9 +618,11 @@ } void Cluster::connectionFrame(const EventFrame& frame) { - // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition. - // Measure performance impact, restore with better locking. + // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition. + // Measure performance impact & review. + // // deliverFrameQueue.push(frame); + // deliveredFrame(frame); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Mar 5 13:28:14 2009 @@ -88,7 +88,7 @@ void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&, uint64_t frameId); + void updateInDone(const ClusterMap&, uint64_t eventId, uint64_t frameId); MemberId getId() const; broker::Broker& getBroker() const; @@ -214,6 +214,7 @@ // Used only in deliveredFrame thread ClusterMap::Set elders; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + uint64_t eventId; // FIXME aconway 2009-03-04: review use for thread safety frame-q thread re-enabled. uint64_t frameId; // Used only during initialization @@ -238,7 +239,6 @@ ClusterMap map; size_t lastSize; bool lastBroker; - uint64_t sequence; // Update related sys::Thread updateThread; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Mar 5 13:28:14 2009 @@ -138,7 +138,6 @@ broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - broker->getExchanges().registerExchange(cluster->getFailoverExchange()); ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance()); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Mar 5 13:28:14 2009 @@ -280,6 +280,7 @@ const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete) { + sessionState().setState( replayStart, sendCommandPoint, @@ -299,9 +300,9 @@ clusterDecoder.setFragment(fragment.data(), fragment.size()); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t eventId, uint64_t frameId) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members), frameId); + cluster.updateInDone(ClusterMap(joiners, members), eventId, frameId); self.second = 0; // Mark this as completed update connection. } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Mar 5 13:28:14 2009 @@ -123,7 +123,7 @@ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t eventId, uint64_t frameId); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Thu Mar 5 13:28:14 2009 @@ -44,7 +44,7 @@ ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s), sequence(0) {} + : type(t), connectionId(c), size(s), id(0) {} Event::Event() {} @@ -128,8 +128,7 @@ } std::ostream& operator << (std::ostream& o, const EventHeader& e) { - o << "[event " << e.getConnectionId() << "/" << e.getSequence() - << " " << e.getType() << " " << e.getSize() << " bytes]"; + o << "Event[id=" << e.getId() << " connection=" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; return o; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Thu Mar 5 13:28:14 2009 @@ -57,8 +57,8 @@ /** Size of header + payload. */ size_t getStoreSize() { return size + HEADER_SIZE; } - uint64_t getSequence() const { return sequence; } - void setSequence(uint64_t n) { sequence = n; } + uint64_t getId() const { return id; } + void setId(uint64_t n) { id = n; } bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } @@ -69,7 +69,7 @@ EventType type; ConnectionId connectionId; size_t size; - uint64_t sequence; + uint64_t id; }; /** Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Thu Mar 5 13:28:14 2009 @@ -24,16 +24,16 @@ namespace qpid { namespace cluster { -EventFrame::EventFrame() : sequence(0) {} +EventFrame::EventFrame() : eventId(0) {} EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) - : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc), type(e.getType()) + : connectionId(e.getConnectionId()), frame(f), eventId(e.getId()), readCredit(rc), type(e.getType()) { QPID_LATENCY_INIT(frame); } std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit << " type=" << e.type; + return o << e.frame << "(from event " << e.eventId << " read-credit=" << e.readCredit << ")"; } }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Thu Mar 5 13:28:14 2009 @@ -49,14 +49,14 @@ // True if this frame follows immediately after frame e. bool follows(const EventFrame& e) const { - return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit); + return eventId == e.eventId || (eventId == e.eventId+1 && e.readCredit); } - bool operator<(const EventFrame& e) const { return sequence < e.sequence; } + bool operator<(const EventFrame& e) const { return eventId < e.eventId; } ConnectionId connectionId; framing::AMQFrame frame; - uint64_t sequence; + uint64_t eventId; int readCredit; ///< last frame in an event, give credit when processed. EventType type; }; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Mar 5 13:28:14 2009 @@ -70,17 +70,12 @@ // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { - QPID_LOG(trace, parent << " write idle."); - return false; -} +bool OutputInterceptor::doOutput() { return false; } // Delivery of doOutput allows us to run the real connection doOutput() // which tranfers frames to the codec for writing. // void OutputInterceptor::deliverDoOutput(size_t requested) { - QPID_LATENCY_RECORD("deliver do-output", *this); - QPID_LATENCY_CLEAR(*this); size_t buf = getBuffered(); if (parent.isLocal()) writeEstimate.delivered(requested, sent, buf); // Update the estimate. @@ -91,9 +86,7 @@ moreOutput = parent.getBrokerConnection().doOutput(); } while (sent < requested && moreOutput); sent += buf; // Include buffered data in the sent total. - - QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput); - + QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput); if (parent.isLocal() && moreOutput) { QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available."); sendDoOutput(); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Mar 5 13:28:14 2009 @@ -86,14 +86,14 @@ // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - frameId(frameId_), connections(cons), + eventId(eventId_), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -104,7 +104,7 @@ UpdateClient::~UpdateClient() {} // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.qpid-update"); +const std::string UpdateClient::UPDATE("qpid.cluster-update"); void UpdateClient::run() { try { @@ -120,9 +120,6 @@ QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); - - // Update exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1)); // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); @@ -133,6 +130,7 @@ ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setEventId(eventId); membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -274,7 +272,7 @@ SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) --received; - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Thu Mar 5 13:28:14 2009 @@ -63,7 +63,7 @@ static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, uint64_t sequence, + broker::Broker& donor, const ClusterMap& map, uint64_t eventId, uint64_t frameId, const std::vector<boost::intrusive_ptr<Connection> >& , const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, @@ -92,6 +92,7 @@ Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; + uint64_t eventId; uint64_t frameId; std::vector<boost::intrusive_ptr<Connection> > connections; client::Connection connection, shadowConnection; Added: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h?rev=750456&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h Thu Mar 5 13:28:14 2009 @@ -0,0 +1,45 @@ +#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H +#define QPID_CLUSTER_UPDATEEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "UpdateClient.h" +#include "qpid/broker/FanOutExchange.h" + + +namespace qpid { +namespace cluster { + +/** + * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange + * on messages. + */ +class UpdateExchange : public broker::FanOutExchange +{ + public: + UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {} + void setProperties(const boost::intrusive_ptr<broker::Message>&) {} +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Mar 5 13:28:14 2009 @@ -509,10 +509,11 @@ c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("foo","q")); c0.session.messageTransfer(arg::content=Message("bar","q")); + while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. - // Add a new broker, it should catch up. + // Add a new broker, it will catch up. cluster.add(); // Do some work post-add @@ -530,6 +531,7 @@ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Thu Mar 5 13:28:14 2009 @@ -339,7 +339,6 @@ { Connection connection; connection.open ( "127.0.0.1", newbie_port ); - sleep ( 2 ); connection.close(); newbie = 0; // He's no newbie anymore! return true; @@ -368,20 +367,19 @@ module << moduleDir << "/cluster.so"; path << srcRoot << "/qpidd"; prefix << "soak-" << brokerId; - - std::vector<std::string> argv = - list_of<string> ("qpidd") - ("--no-module-dir") - ("--load-module=cluster.so") - ("--cluster-name") - (clusterName) - ("--auth=no") - ("--no-data-dir") - ("--mgmt-enable=no") - ("--log-prefix") - (prefix.str()) - ("--log-to-file") - ("/tmp/qpidd.log"); + std::vector<std::string> argv = list_of<string> + ("qpidd") + ("--no-module-dir") + ("--load-module=cluster.so") + ("--cluster-name") + (clusterName) + ("--auth=no") + ("--no-data-dir") + ("--mgmt-enable=no") + ("--log-prefix") + (prefix.str()) + ("--log-to-file") + (prefix.str()+".log"); newbie = new ForkedBroker ( argv ); newbie_port = newbie->getPort(); Modified: qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Thu Mar 5 13:28:14 2009 @@ -72,7 +72,7 @@ sender.send(message); if (count > reportFrequency && !(sent % reportFrequency)) { if ( verbosity > 0 ) - std::cout << "sent " << sent << " of " << count << std::endl; + std::cout << "Sender sent " << sent << " of " << count << std::endl; } } message.setData("That's all, folks!"); Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=750456&r1=750455&r2=750456&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Thu Mar 5 13:28:14 2009 @@ -132,6 +132,7 @@ <control name="membership" code="0x21" label="Cluster membership details."> <field name="joiners" type="map"/> <!-- member-id -> URL --> <field name="members" type="map"/> <!-- member-id -> state --> + <field name="event-id" type="uint64"/>> <!-- Event id counter value --> <field name="frame-id" type="uint64"/>> <!-- Frame id counter value --> </control> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org