Author: aconway
Date: Thu Mar  5 13:28:14 2009
New Revision: 750456

URL: http://svn.apache.org/viewvc?rev=750456&view=rev
Log:
cluster: fix delivery-property.exchange-name set on updated messages.

Logging improvements, useful for debugging:
 - qpid/SessionState.cpp: show frame bodies with command IDs.
 - assign cluster-wide id number to each Event.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
    qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Thu Mar  5 13:28:14 2009
@@ -64,6 +64,7 @@
   qpid/cluster/ExpiryPolicy.cpp                        \
   qpid/cluster/FailoverExchange.cpp            \
   qpid/cluster/FailoverExchange.h              \
+  qpid/cluster/UpdateExchange.h                        \
   qpid/cluster/Multicaster.cpp                 \
   qpid/cluster/Multicaster.h                   \
   qpid/cluster/McastFrameHandler.h             \

Modified: qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Thu Mar  5 13:28:14 2009
@@ -113,7 +113,8 @@
 
 void SessionState::senderRecord(const AMQFrame& f) {
     if (isControl(f)) return;   // Ignore control frames.
-    QPID_LOG_IF(debug, f.getMethod(), getId() << ": sent cmd " << 
sender.sendPoint.command << ": " << *f.getMethod());
+    QPID_LOG(trace, getId() << ": sent cmd " << sender.sendPoint.command << ": 
" << *f.getBody());
+
     stateful = true;
     if (timeout) sender.replayList.push_back(f);
     sender.unflushedSize += f.encodedSize();
@@ -193,8 +194,8 @@
         receiver.received = receiver.expected;
         receiver.incomplete += receiverGetCurrent();
     }
-    QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << 
receiverGetCurrent() << ": " << *f.getMethod());
-    QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << 
receiverGetCurrent() << ": " << f);
+    QPID_LOG(trace, getId() << ": recv cmd " << receiverGetCurrent() << ": " 
<< *f.getBody());
+    if (!firstTime) QPID_LOG(trace, "Ignoring duplicate frame.");
     return firstTime;
 }
     

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Mar  5 13:28:14 2009
@@ -275,3 +275,7 @@
 {
     return b->queue == queue;
 }
+
+void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
+    msg->getProperties<DeliveryProperties>()->setExchange(getName());
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Mar  5 13:28:14 2009
@@ -139,8 +139,9 @@
     virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, 
const qpid::framing::FieldTable* args) = 0;
     virtual bool unbind(Queue::shared_ptr queue, const std::string& 
routingKey, const qpid::framing::FieldTable* args) = 0;
     virtual bool isBound(Queue::shared_ptr queue, const std::string* const 
routingKey, const qpid::framing::FieldTable* const args) = 0;
+    virtual void setProperties(const boost::intrusive_ptr<Message>&);
     virtual void route(Deliverable& msg, const std::string& routingKey, const 
qpid::framing::FieldTable* args) = 0;
-
+    
     //PersistableExchange:
     void setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Mar  5 13:28:14 
2009
@@ -355,20 +355,12 @@
 }
 
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
+    msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+    
     std::string exchangeName = msg->getExchangeName();
-    //TODO: the following should be hidden behind message (using 
MessageAdapter or similar)
-
-    if (msg->isA<MessageTransferBody>()) {
-        // Do not replace the delivery-properties.exchange if it is is already 
set.
-        // This is used internally (by the cluster) to force the exchange name 
on a message.
-        // The client library ensures this is always empty for messages from 
normal clients.
-        if (!msg->hasProperties<DeliveryProperties>() || 
msg->getProperties<DeliveryProperties>()->getExchange().empty())
-            
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
-        msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
-    }
-    if (!cacheExchange || cacheExchange->getName() != exchangeName){
+    if (!cacheExchange || cacheExchange->getName() != exchangeName)
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
-    }
+    cacheExchange->setProperties(msg);
 
     /* verify the userid if specified: */
     std::string id =

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Mar  5 13:28:14 2009
@@ -21,6 +21,7 @@
 #include "Connection.h"
 #include "UpdateClient.h"
 #include "FailoverExchange.h"
+#include "UpdateExchange.h"
 
 #include "qpid/assert.h"
 #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -106,13 +107,13 @@
                       "Error delivering frames",
                       poller),
     expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), 
mcast, self, broker.getTimer())),
+    eventId(0),
     frameId(0),
     initialized(false),
     state(INIT),
     connections(*this),
     lastSize(0),
-    lastBroker(false),
-    sequence(0)
+    lastBroker(false)
 {
     mAgent = ManagementAgent::Singleton::getInstance();
     if (mAgent != 0){
@@ -122,7 +123,13 @@
         mgmtObject->set_status("JOINING");
     }
 
+    // Failover exchange provides membership updates to clients.
     failoverExchange.reset(new FailoverExchange(this));
+    broker.getExchanges().registerExchange(failoverExchange);
+
+    // Update exchange is used during updates to replicate messages without 
modifying delivery-properties.exchange.
+    
broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new 
UpdateExchange(this)));
+
     if (settings.quorum) quorum.init();
     cpg.join(name);
     // pump the CPG dispatch manually till we get initialized. 
@@ -212,7 +219,6 @@
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
-    e.setSequence(sequence++);
     if (from == self)  // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
     deliver(e);
@@ -225,34 +231,40 @@
 }
 
 // Handler for deliverEventQueue
-void Cluster::deliveredEvent(const Event& e) {
-    QPID_LATENCY_RECORD("delivered event queue", e);
+void Cluster::deliveredEvent(const Event& event) {
+    Event e(event);
     Mutex::ScopedLock l(lock);
+    if (state >= CATCHUP) {
+        e.setId(++eventId);
+        QPID_LOG(trace, *this << " DLVR: " << e);
+    }
     if (e.isCluster()) {        // Cluster control, process in this thread.
-        AMQFrame frame(e.getFrame());
+        EventFrame ef(e, e.getFrame());
+        QPID_LOG(trace, *this << " DLVR:  " << ef);
         ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
-        if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+        if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled())
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
-    else if (state >= CATCHUP) { // Connection frame, push onto deliver queue.
-        if (e.getType() == CONTROL)
+    else if (state >= CATCHUP) { // Handle connection frames  
+        if (e.getType() == CONTROL) {
             connectionFrame(EventFrame(e, e.getFrame()));
+        }
         else
             connections.decode(e, e.getData());
     }
-    else                        // connection frame && state < CATCHUP. Drop.
-        QPID_LOG(trace, *this << " DROP: " << e);
+    // Drop connection frames while state < CATCHUP
 }
 
 // Handler for deliverFrameQueue
-void Cluster::deliveredFrame(const EventFrame& e) {
+void Cluster::deliveredFrame(const EventFrame& event) {
     Mutex::ScopedLock l(lock);  // TODO aconway 2009-03-02: don't need this 
lock?
+    EventFrame e(event);
     assert(!e.isCluster());     // Only connection frames on this queue.
-    QPID_LOG(trace, *this << " DLVR: " << e);
-    if (e.type == DATA)    // Sequence number to identify data frames.
-        const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+    QPID_LOG(trace, *this << " DLVR:  " << e);
+    if (e.type == DATA)         // Add cluster-id to to data frames.
+        e.frame.setClusterId(frameId++);
     boost::intrusive_ptr<Connection> connection = 
connections.get(e.connectionId);
-    if (connection)         // Ignore frames to closed local connections.
+    if (connection)      // Ignore frames to closed local connections.
         connection->deliveredFrame(e);
 }
   
@@ -389,6 +401,10 @@
     // Stop processing the deliveredEventQueue in order to send or
     // recieve an update.
     deliverEventQueue.stop();
+
+    // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must
+    // also wait for it to be empty before we are stalled, so that
+    // our local model is up-to-date to give an update.
 }
 
 void Cluster::unstall(Lock&) {
@@ -434,17 +450,18 @@
     cs.password = settings.password;
     cs.mechanism = settings.mechanism;
     updateThread = Thread(
-        new UpdateClient(self, updatee, url, broker, map, frameId, 
connections.values(),
+        new UpdateClient(self, updatee, url, broker, map, eventId, frameId, 
connections.values(),
                          boost::bind(&Cluster::updateOutDone, this),
                          boost::bind(&Cluster::updateOutError, this, _1),
                          cs));
 }
 
 // Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t 
frameId_) {
     Lock l(lock);
     updatedMap = m;
-    frameId = fid;
+    eventId = eventId_;
+    frameId = frameId_;
     checkUpdateIn(l);
 }
 
@@ -601,9 +618,11 @@
 }
 
 void Cluster::connectionFrame(const EventFrame& frame) {
-    // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race 
condition.
-    // Measure performance impact, restore with better locking.
+    // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race 
condition.
+    // Measure performance impact & review.
+    // 
     // deliverFrameQueue.push(frame);
+    // 
     deliveredFrame(frame);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Mar  5 13:28:14 2009
@@ -88,7 +88,7 @@
     void leave();
 
     // Update completed - called in update thread
-    void updateInDone(const ClusterMap&, uint64_t frameId);
+    void updateInDone(const ClusterMap&, uint64_t eventId, uint64_t frameId);
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
@@ -214,6 +214,7 @@
     // Used only in deliveredFrame thread
     ClusterMap::Set elders;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    uint64_t eventId;           // FIXME aconway 2009-03-04: review use for 
thread safety frame-q thread re-enabled.
     uint64_t frameId;
 
     // Used only during initialization
@@ -238,7 +239,6 @@
     ClusterMap map;
     size_t lastSize;
     bool lastBroker;
-    uint64_t sequence;
 
     //     Update related
     sys::Thread updateThread;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Mar  5 13:28:14 
2009
@@ -138,7 +138,6 @@
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), 
*cluster)));
-        
broker->getExchanges().registerExchange(cluster->getFailoverExchange());
         ManagementBroker* mgmt = 
dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
         if (mgmt) {
             std::auto_ptr<IdAllocator> allocator(new 
UpdateClientIdAllocator());

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Mar  5 13:28:14 2009
@@ -280,6 +280,7 @@
     const SequenceSet& unknownCompleted,
     const SequenceSet& receivedIncomplete)
 {
+    
     sessionState().setState(
         replayStart,
         sendCommandPoint,
@@ -299,9 +300,9 @@
     clusterDecoder.setFragment(fragment.data(), fragment.size());
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& 
members, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& 
members, uint64_t eventId, uint64_t frameId) {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << 
*this);
-    cluster.updateInDone(ClusterMap(joiners, members), frameId);
+    cluster.updateInDone(ClusterMap(joiners, members), eventId, frameId);
     self.second = 0;        // Mark this as completed update connection.
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Mar  5 13:28:14 2009
@@ -123,7 +123,7 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId, const 
std::string& username, const std::string& fragment);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&, 
uint64_t frameId);
+    void membership(const framing::FieldTable&, const framing::FieldTable&, 
uint64_t eventId, uint64_t frameId);
 
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Thu Mar  5 13:28:14 2009
@@ -44,7 +44,7 @@
     ;
 
 EventHeader::EventHeader(EventType t, const ConnectionId& c,  size_t s)
-    : type(t), connectionId(c), size(s), sequence(0) {}
+    : type(t), connectionId(c), size(s), id(0) {}
 
 
 Event::Event() {}
@@ -128,8 +128,7 @@
 }
 
 std::ostream& operator << (std::ostream& o, const EventHeader& e) {
-    o << "[event " << e.getConnectionId()  << "/" << e.getSequence()
-      << " " << e.getType() << " " << e.getSize() << " bytes]";
+    o << "Event[id=" << e.getId() << " connection=" << e.getConnectionId() << 
" " << e.getType() << " " << e.getSize() << " bytes]";
     return o;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Thu Mar  5 13:28:14 2009
@@ -57,8 +57,8 @@
     /** Size of header + payload. */ 
     size_t getStoreSize() { return size + HEADER_SIZE; }
 
-    uint64_t getSequence() const { return sequence; }
-    void setSequence(uint64_t n) { sequence = n; }
+    uint64_t getId() const { return id; }
+    void setId(uint64_t n) { id = n; }
 
     bool isCluster() const { return connectionId.getNumber() == 0; }
     bool isConnection() const { return connectionId.getNumber() != 0; }
@@ -69,7 +69,7 @@
     EventType type;
     ConnectionId connectionId;
     size_t size;
-    uint64_t sequence;
+    uint64_t id;
 };
 
 /**

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Thu Mar  5 13:28:14 2009
@@ -24,16 +24,16 @@
 namespace qpid {
 namespace cluster {
 
-EventFrame::EventFrame() : sequence(0) {}
+EventFrame::EventFrame() : eventId(0) {}
 
 EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int 
rc)
-    : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), 
readCredit(rc), type(e.getType())
+    : connectionId(e.getConnectionId()), frame(f), eventId(e.getId()), 
readCredit(rc), type(e.getType())
 {
     QPID_LATENCY_INIT(frame);
 }
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
-    return o << e.connectionId << "/" << e.sequence << " " << e.frame << " 
rc=" << e.readCredit << " type=" << e.type;
+    return o << e.frame << "(from event " << e.eventId << " read-credit=" << 
e.readCredit << ")";
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Thu Mar  5 13:28:14 2009
@@ -49,14 +49,14 @@
 
     // True if this frame follows immediately after frame e. 
     bool follows(const EventFrame& e) const {
-        return sequence == e.sequence || (sequence == e.sequence+1 && 
e.readCredit);
+        return eventId == e.eventId || (eventId == e.eventId+1 && 
e.readCredit);
     }
 
-    bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
+    bool operator<(const EventFrame& e) const { return eventId < e.eventId; }
     
     ConnectionId connectionId;
     framing::AMQFrame frame;   
-    uint64_t sequence;
+    uint64_t eventId;
     int readCredit; ///< last frame in an event, give credit when processed.
     EventType type;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Mar  5 
13:28:14 2009
@@ -70,17 +70,12 @@
 // Called in write thread when the IO layer has no more data to write.
 // We do nothing in the write thread, we run doOutput only on delivery
 // of doOutput requests.
-bool  OutputInterceptor::doOutput() {
-    QPID_LOG(trace, parent << " write idle.");
-    return false;
-}
+bool  OutputInterceptor::doOutput() { return false; }
 
 // Delivery of doOutput allows us to run the real connection doOutput()
 // which tranfers frames to the codec for writing.
 // 
 void OutputInterceptor::deliverDoOutput(size_t requested) {
-    QPID_LATENCY_RECORD("deliver do-output", *this);
-    QPID_LATENCY_CLEAR(*this);
     size_t buf = getBuffered();
     if (parent.isLocal())
         writeEstimate.delivered(requested, sent, buf); // Update the estimate.
@@ -91,9 +86,7 @@
         moreOutput = parent.getBrokerConnection().doOutput();
     } while (sent < requested && moreOutput);
     sent += buf;                // Include buffered data in the sent total.
-
-    QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " 
output=" << sent << " more=" << moreOutput);
-
+    QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << 
" sent=" << sent << " more=" << moreOutput);
     if (parent.isLocal() && moreOutput)  {
         QPID_LOG(trace,  parent << " deliverDoOutput - sending doOutput, more 
output available.");
         sendDoOutput();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Mar  5 13:28:14 
2009
@@ -86,14 +86,14 @@
 // TODO aconway 2008-09-24: optimization: update connections/sessions in 
parallel.
 
 UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, 
const Url& url,
-                           broker::Broker& broker, const ClusterMap& m, 
uint64_t frameId_,
+                           broker::Broker& broker, const ClusterMap& m, 
uint64_t eventId_, uint64_t frameId_, 
                            const Cluster::Connections& cons,
                            const boost::function<void()>& ok,
                            const boost::function<void(const std::exception&)>& 
fail,
                            const client::ConnectionSettings& cs
 )
     : updaterId(updater), updateeId(updatee), updateeUrl(url), 
updaterBroker(broker), map(m),
-      frameId(frameId_), connections(cons), 
+      eventId(eventId_), frameId(frameId_), connections(cons), 
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail), connectionSettings(cs)
 {
@@ -104,7 +104,7 @@
 UpdateClient::~UpdateClient() {}
 
 // Reserved exchange/queue name for catch-up, avoid clashes with user 
queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.qpid-update");
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
 
 void UpdateClient::run() {
     try {
@@ -120,9 +120,6 @@
     QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " 
<< updateeUrl);
     Broker& b = updaterBroker;
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, 
this, _1));
-
-    // Update exchange is used to route messages to the proper queue without 
modifying routing key.
-    session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", 
arg::autoDelete=true);
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
     // Update queue is used to transfer acquired messages that are no longer 
on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
@@ -133,6 +130,7 @@
 
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
+    membership.setEventId(eventId);
     membership.setFrameId(frameId);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -274,7 +272,7 @@
     SequenceNumber received = ss->receiverGetReceived().command;
     if (inProgress)  
         --received;
-
+             
     // Reset command-sequence state.
     proxy.sessionState(
         ss->senderGetReplayPoint().command,

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Thu Mar  5 13:28:14 2009
@@ -63,7 +63,7 @@
     static const std::string UPDATE; // Name for special update queue and 
exchange.
     
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
-                 broker::Broker& donor, const ClusterMap& map, uint64_t 
sequence,
+                 broker::Broker& donor, const ClusterMap& map, uint64_t 
eventId, uint64_t frameId,
                  const std::vector<boost::intrusive_ptr<Connection> >& ,
                  const boost::function<void()>& done,
                  const boost::function<void(const std::exception&)>& fail,
@@ -92,6 +92,7 @@
     Url updateeUrl;
     broker::Broker& updaterBroker;
     ClusterMap map;
+    uint64_t eventId;
     uint64_t frameId;
     std::vector<boost::intrusive_ptr<Connection> > connections;
     client::Connection connection, shadowConnection;

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h?rev=750456&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h Thu Mar  5 13:28:14 
2009
@@ -0,0 +1,45 @@
+#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H
+#define QPID_CLUSTER_UPDATEEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "UpdateClient.h"
+#include "qpid/broker/FanOutExchange.h"
+
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A keyless exchange (like fanout exchange) that does not modify 
deliver-properties.exchange
+ * on messages.
+ */
+class UpdateExchange : public broker::FanOutExchange
+{
+  public:
+    UpdateExchange(management::Manageable* parent) : 
broker::Exchange(UpdateClient::UPDATE, parent), 
broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
+    void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Mar  5 13:28:14 2009
@@ -509,10 +509,11 @@
     c0.session.queueDeclare("q");
     c0.session.messageTransfer(arg::content=Message("foo","q"));
     c0.session.messageTransfer(arg::content=Message("bar","q"));
+
     while (c0.session.queueQuery("q").getMessageCount() != 2)
         sys::usleep(1000);    // Wait for message to show up on broker 0.
 
-    // Add a new broker, it should catch up.
+    // Add a new broker, it will catch up.
     cluster.add();
 
     // Do some work post-add
@@ -530,6 +531,7 @@
 
     BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "foo");
+    BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
     BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "bar");
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);

Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Thu Mar  5 13:28:14 2009
@@ -339,7 +339,6 @@
   {
     Connection connection;
     connection.open ( "127.0.0.1", newbie_port );
-    sleep ( 2 );
     connection.close();
     newbie = 0;  // He's no newbie anymore!
     return true;
@@ -368,20 +367,19 @@
     module << moduleDir << "/cluster.so";
     path << srcRoot << "/qpidd";
     prefix << "soak-" << brokerId;
-
-    std::vector<std::string> argv = 
-        list_of<string> ("qpidd")
-                        ("--no-module-dir")
-                        ("--load-module=cluster.so")
-                        ("--cluster-name")
-                        (clusterName)
-                        ("--auth=no")
-                        ("--no-data-dir")
-                        ("--mgmt-enable=no")
-                        ("--log-prefix")
-                        (prefix.str())
-                        ("--log-to-file")
-                        ("/tmp/qpidd.log");
+    std::vector<std::string> argv = list_of<string>
+        ("qpidd")
+        ("--no-module-dir")
+        ("--load-module=cluster.so")
+        ("--cluster-name")
+        (clusterName)
+        ("--auth=no")
+        ("--no-data-dir")
+        ("--mgmt-enable=no")
+        ("--log-prefix")
+        (prefix.str())
+        ("--log-to-file")
+        (prefix.str()+".log");
 
     newbie = new ForkedBroker ( argv );
     newbie_port = newbie->getPort();

Modified: qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/replaying_sender.cpp Thu Mar  5 13:28:14 2009
@@ -72,7 +72,7 @@
         sender.send(message);
         if (count > reportFrequency && !(sent % reportFrequency)) {
             if ( verbosity > 0 )
-                std::cout << "sent " << sent << " of " << count << std::endl;
+                std::cout << "Sender sent " << sent << " of " << count << 
std::endl;
         }
     }
     message.setData("That's all, folks!");

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=750456&r1=750455&r2=750456&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Thu Mar  5 13:28:14 2009
@@ -132,6 +132,7 @@
     <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
+      <field name="event-id" type="uint64"/>> <!-- Event id counter value -->
       <field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
     </control>
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to