Author: aconway
Date: Tue Jan 27 01:44:02 2009
New Revision: 737968

URL: http://svn.apache.org/viewvc?rev=737968&view=rev
Log:
cluster: Add sequence number to events & frames

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h   (with props)
Removed:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Jan 27 01:44:02 2009
@@ -40,7 +40,7 @@
   $(CMAN_SOURCES)                              \
   qpid/cluster/Cluster.cpp                     \
   qpid/cluster/Cluster.h                       \
-  qpid/cluster/ClusterLeaveException.h         \
+  qpid/cluster/ClusterQueueHandler.h           \
   qpid/cluster/ClusterMap.cpp                  \
   qpid/cluster/ClusterMap.h                    \
   qpid/cluster/ClusterPlugin.cpp               \

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jan 27 01:44:02 2009
@@ -20,6 +20,7 @@
 #include "Connection.h"
 #include "DumpClient.h"
 #include "FailoverExchange.h"
+#include "ClusterQueueHandler.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SessionState.h"
@@ -97,11 +98,12 @@
     writeEstimate(writeEstimate_),
     mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
-    deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), 
poller),
-    deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), 
poller),
+    deliverEventQueue(ClusterQueueHandler<Event>(this, 
boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"),  poller),
+    deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, 
boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),
     state(INIT),
     lastSize(0),
-    lastBroker(false)
+    lastBroker(false),
+    sequence(0)
 {
     mAgent = ManagementAgent::Singleton::getInstance();
     if (mAgent != 0){
@@ -195,6 +197,7 @@
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
+    e.setSequence(sequence++);
     if (from == myId)  // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
     deliver(e, l);
@@ -208,26 +211,6 @@
 }
 
 // Entry point: called when deliverEventQueue has events to process.
-void Cluster::deliveredEvents(PollableEventQueue::Queue& events) {
-    try {
-        for_each(events.begin(), events.end(), 
boost::bind(&Cluster::deliveredEvent, this, _1));
-        events.clear();
-    } catch (const std::exception& e) {
-        QPID_LOG(critical, *this << " error in cluster delivery: " << 
e.what());
-        leave();
-    }
-}
-
-void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) {
-    try {
-        for_each(frames.begin(), frames.end(), 
boost::bind(&Cluster::deliveredFrame, this, _1));
-        frames.clear();
-    } catch (const std::exception& e) {
-        QPID_LOG(critical, *this << " error in cluster delivery: " << 
e.what());
-        leave();
-    }
-}
-
 void Cluster::deliveredEvent(const Event& e) {
     QPID_LATENCY_RECORD("delivered event queue", e);
     Buffer buf(const_cast<char*>(e.getData()), e.getSize());
@@ -243,7 +226,7 @@
     if (e.getType() == CONTROL) {
         AMQFrame frame;
         while (frame.decode(buf)) {
-            deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), 
frame));
+            deliverFrameQueue.push(EventFrame(connection, e, frame));
         }
     }
     else if (e.getType() == DATA) { 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jan 27 01:44:02 2009
@@ -129,8 +129,6 @@
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void shutdown(const MemberId&, Lock&);
-    void deliveredEvents(PollableEventQueue::Queue&);
-    void deliveredFrames(PollableFrameQueue::Queue&);
     void deliveredEvent(const Event&); 
     void deliveredFrame(const EventFrame&); 
 
@@ -215,6 +213,7 @@
     ClusterMap map;
     size_t lastSize;
     bool lastBroker;
+    uint64_t sequence;
 
     //     Dump related
     sys::Thread dumpThread;

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h?rev=737968&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h Tue Jan 27 
01:44:02 2009
@@ -0,0 +1,56 @@
+#ifndef QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
+#define QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "qpid/sys/PollableQueue.h"
+#include  <qpid/log/Statement.h>
+
+namespace qpid {
+namespace cluster {
+
+/** Convenience functor for PollableQueue callbacks. */
+template <class T> struct ClusterQueueHandler {
+    ClusterQueueHandler(Cluster& c, boost::function<void (const T&)> f, const 
std::string& n) : cluster(c), callback(f), name(n) {}
+    ClusterQueueHandler(const Cluster* c, boost::function<void (const T&)> f, 
const std::string& n) : cluster(*const_cast<Cluster*>(c)), callback(f), name(n) 
{}
+
+    void operator()(typename sys::PollableQueue<T>::Queue& values) {
+        try {
+            std::for_each(values.begin(), values.end(), callback);
+            values.clear();
+        }
+        catch (const std::exception& e) {
+            QPID_LOG(error, "Error on " << name << ": " << e.what());
+            cluster.leave();
+        }
+    }
+
+    Cluster& cluster;
+    boost::function<void (const T&)> callback;
+    std::string name;
+};
+
+    
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CLUSTERQUEUEHANDLER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jan 27 01:44:02 2009
@@ -137,7 +137,7 @@
 }
 
 // Decode buffer and put frames on frameq.
-void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) {
+void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) {
     assert(!catchUp);
     Buffer buf(e);
     // Set read credit on the last frame.
@@ -145,10 +145,10 @@
     if (!mcastDecoder.decode(buf)) return;
     AMQFrame frame(mcastDecoder.frame);
     while (mcastDecoder.decode(buf)) {
-        frameq.push(EventFrame(this, getId().getMember(), frame));
+        frameq.push(EventFrame(this, e, frame));
         frame = mcastDecoder.frame;
     }
-    frameq.push(EventFrame(this, getId().getMember(), frame, readCredit));
+    frameq.push(EventFrame(this, e, frame, readCredit));
     readCredit = 0;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jan 27 01:44:02 2009
@@ -26,7 +26,6 @@
 #include "WriteEstimate.h"
 #include "OutputInterceptor.h"
 #include "NoOpConnectionOutputHandler.h"
-#include "Event.h"
 #include "EventFrame.h"
 
 #include "qpid/broker/Connection.h"
@@ -62,7 +61,7 @@
         
 {
   public:
-    typedef sys::PollableQueue<EventFrame> EventFrameQueue;
+    typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
 
     /** Local connection, use this in ConnectionId */
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& 
id, MemberId, bool catchUp, bool isLink);
@@ -102,7 +101,7 @@
     size_t decode(const char* buffer, size_t size);
 
     // Called for data delivered from the cluster.
-    void deliveredEvent(const Event&, EventFrameQueue&);
+    void deliveredEvent(const Event&, PollableFrameQueue&);
     void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Jan 27 01:44:02 2009
@@ -42,7 +42,7 @@
     ;
 
 EventHeader::EventHeader(EventType t, const ConnectionId& c,  size_t s)
-    : type(t), connectionId(c), size(s) {}
+    : type(t), connectionId(c), size(s), sequence(0) {}
 
 
 Event::Event() {}
@@ -53,10 +53,10 @@
 
 void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
     if (buf.available() <= HEADER_SIZE)
-        throw ClusterLeaveException("Not enough for multicast header");
+        throw Exception("Not enough for multicast header");
     type = (EventType)buf.getOctet();
     if(type != DATA && type != CONTROL)
-        throw ClusterLeaveException("Invalid multicast event type");
+        throw Exception("Invalid multicast event type");
     connectionId = ConnectionId(m, 
reinterpret_cast<Connection*>(buf.getLongLong()));
     size = buf.getLong();
 #ifdef QPID_LATENCY_METRIC
@@ -68,7 +68,7 @@
     Event e;
     e.decode(m, buf);           // Header
     if (buf.available() < e.size)
-        throw ClusterLeaveException("Not enough data for multicast event");
+        throw Exception("Not enough data for multicast event");
     e.store = RefCountedBuffer::create(e.size + HEADER_SIZE);
     memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
     return e;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Jan 27 01:44:02 2009
@@ -23,10 +23,7 @@
  */
 
 #include "types.h"
-#include "Cpg.h"
-#include "Connection.h"
 #include "qpid/RefCountedBuffer.h"
-#include "qpid/framing/Buffer.h"
 #include "qpid/sys/LatencyMetric.h"
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
@@ -37,6 +34,7 @@
 
 namespace framing {
 class AMQBody;
+class Buffer;
 }
 
 namespace cluster {
@@ -52,6 +50,8 @@
     ConnectionId getConnectionId() const { return connectionId; }
     MemberId getMemberId() const { return connectionId.getMember(); }
     size_t getSize() const { return size; }
+    uint64_t getSequence() const { return sequence; }
+    void setSequence(uint64_t n) { sequence = n; }
 
     bool isCluster() const { return connectionId.getPointer() == 0; }
     bool isConnection() const { return connectionId.getPointer() != 0; }
@@ -62,6 +62,7 @@
     EventType type;
     ConnectionId connectionId;
     size_t size;
+    uint64_t sequence;
 };
 
 /**

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Tue Jan 27 01:44:02 2009
@@ -23,6 +23,7 @@
  */
 
 #include "types.h"
+#include "Event.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/sys/LatencyMetric.h"
 #include <boost/intrusive_ptr.hpp>
@@ -37,19 +38,30 @@
  */
 struct EventFrame
 {
+    EventFrame() : sequence(0) {}
     // Connection event frame
-    EventFrame(const boost::intrusive_ptr<Connection>& c, const MemberId& m, 
const framing::AMQFrame& f, int rc=0)
-        : connection(c), member(m), frame(f), readCredit(rc) {
+    EventFrame(const boost::intrusive_ptr<Connection>& c, const Event& e, 
const framing::AMQFrame& f, int rc=0)
+        : connection(c), member(e.getMemberId()), frame(f), 
sequence(e.getSequence()), readCredit(rc)
+    {
         QPID_LATENCY_INIT(frame);
     }
 
     bool isCluster() const { return !connection; }
     bool isConnection() const { return connection; }
+    bool isLastInEvent() const { return readCredit; }
+
+    // True if this frame follows immediately after frame e. 
+    bool follows(const EventFrame& e) const {
+        return sequence == e.sequence || (sequence == e.sequence+1 && 
e.readCredit);
+    }
+
+    bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
     
     boost::intrusive_ptr<Connection> connection;
     MemberId member;
     framing::AMQFrame frame;   
-    int readCredit;             // restore this much read credit when frame is 
processed
+    uint64_t sequence;
+    int readCredit;             // last frame in an event, give credit when 
processed.
 };
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Jan 27 01:44:02 
2009
@@ -21,7 +21,6 @@
 
 #include "Multicaster.h"
 #include "Cpg.h"
-#include "ClusterLeaveException.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/LatencyMetric.h"
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Jan 27 01:44:02 2009
@@ -22,8 +22,6 @@
  *
  */
 
-
-#include "ClusterLeaveException.h"
 #include "config.h"
 #include "qpid/Url.h"
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=737968&r1=737967&r2=737968&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Tue Jan 27 01:44:02 2009
@@ -46,6 +46,7 @@
 class PollableQueue {
   public:
     typedef std::deque<T> Queue;
+    typedef T value_type;
 
     /**
      * Callback to process a batch of items from the queue.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to