Author: aconway Date: Tue Jan 27 01:44:02 2009 New Revision: 737968 URL: http://svn.apache.org/viewvc?rev=737968&view=rev Log: cluster: Add sequence number to events & frames
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h (with props) Removed: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h Modified: qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/types.h qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Jan 27 01:44:02 2009 @@ -40,7 +40,7 @@ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ - qpid/cluster/ClusterLeaveException.h \ + qpid/cluster/ClusterQueueHandler.h \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/ClusterMap.h \ qpid/cluster/ClusterPlugin.cpp \ Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jan 27 01:44:02 2009 @@ -20,6 +20,7 @@ #include "Connection.h" #include "DumpClient.h" #include "FailoverExchange.h" +#include "ClusterQueueHandler.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -97,11 +98,12 @@ writeEstimate(writeEstimate_), mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller), - deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller), + deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller), + deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + sequence(0) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,6 +197,7 @@ MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); + e.setSequence(sequence++); if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e, l); @@ -208,26 +211,6 @@ } // Entry point: called when deliverEventQueue has events to process. -void Cluster::deliveredEvents(PollableEventQueue::Queue& events) { - try { - for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); - events.clear(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); - leave(); - } -} - -void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) { - try { - for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1)); - frames.clear(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); - leave(); - } -} - void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); Buffer buf(const_cast<char*>(e.getData()), e.getSize()); @@ -243,7 +226,7 @@ if (e.getType() == CONTROL) { AMQFrame frame; while (frame.decode(buf)) { - deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame)); + deliverFrameQueue.push(EventFrame(connection, e, frame)); } } else if (e.getType() == DATA) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jan 27 01:44:02 2009 @@ -129,8 +129,6 @@ void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); - void deliveredEvents(PollableEventQueue::Queue&); - void deliveredFrames(PollableFrameQueue::Queue&); void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -215,6 +213,7 @@ ClusterMap map; size_t lastSize; bool lastBroker; + uint64_t sequence; // Dump related sys::Thread dumpThread; Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h?rev=737968&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h Tue Jan 27 01:44:02 2009 @@ -0,0 +1,56 @@ +#ifndef QPID_CLUSTER_CLUSTERQUEUEHANDLER_H +#define QPID_CLUSTER_CLUSTERQUEUEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Cluster.h" +#include "qpid/sys/PollableQueue.h" +#include <qpid/log/Statement.h> + +namespace qpid { +namespace cluster { + +/** Convenience functor for PollableQueue callbacks. */ +template <class T> struct ClusterQueueHandler { + ClusterQueueHandler(Cluster& c, boost::function<void (const T&)> f, const std::string& n) : cluster(c), callback(f), name(n) {} + ClusterQueueHandler(const Cluster* c, boost::function<void (const T&)> f, const std::string& n) : cluster(*const_cast<Cluster*>(c)), callback(f), name(n) {} + + void operator()(typename sys::PollableQueue<T>::Queue& values) { + try { + std::for_each(values.begin(), values.end(), callback); + values.clear(); + } + catch (const std::exception& e) { + QPID_LOG(error, "Error on " << name << ": " << e.what()); + cluster.leave(); + } + } + + Cluster& cluster; + boost::function<void (const T&)> callback; + std::string name; +}; + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERQUEUEHANDLER_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jan 27 01:44:02 2009 @@ -137,7 +137,7 @@ } // Decode buffer and put frames on frameq. -void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) { +void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { assert(!catchUp); Buffer buf(e); // Set read credit on the last frame. @@ -145,10 +145,10 @@ if (!mcastDecoder.decode(buf)) return; AMQFrame frame(mcastDecoder.frame); while (mcastDecoder.decode(buf)) { - frameq.push(EventFrame(this, getId().getMember(), frame)); + frameq.push(EventFrame(this, e, frame)); frame = mcastDecoder.frame; } - frameq.push(EventFrame(this, getId().getMember(), frame, readCredit)); + frameq.push(EventFrame(this, e, frame, readCredit)); readCredit = 0; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jan 27 01:44:02 2009 @@ -26,7 +26,6 @@ #include "WriteEstimate.h" #include "OutputInterceptor.h" #include "NoOpConnectionOutputHandler.h" -#include "Event.h" #include "EventFrame.h" #include "qpid/broker/Connection.h" @@ -62,7 +61,7 @@ { public: - typedef sys::PollableQueue<EventFrame> EventFrameQueue; + typedef sys::PollableQueue<EventFrame> PollableFrameQueue; /** Local connection, use this in ConnectionId */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); @@ -102,7 +101,7 @@ size_t decode(const char* buffer, size_t size); // Called for data delivered from the cluster. - void deliveredEvent(const Event&, EventFrameQueue&); + void deliveredEvent(const Event&, PollableFrameQueue&); void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Jan 27 01:44:02 2009 @@ -42,7 +42,7 @@ ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s) {} + : type(t), connectionId(c), size(s), sequence(0) {} Event::Event() {} @@ -53,10 +53,10 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { if (buf.available() <= HEADER_SIZE) - throw ClusterLeaveException("Not enough for multicast header"); + throw Exception("Not enough for multicast header"); type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) - throw ClusterLeaveException("Invalid multicast event type"); + throw Exception("Invalid multicast event type"); connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); size = buf.getLong(); #ifdef QPID_LATENCY_METRIC @@ -68,7 +68,7 @@ Event e; e.decode(m, buf); // Header if (buf.available() < e.size) - throw ClusterLeaveException("Not enough data for multicast event"); + throw Exception("Not enough data for multicast event"); e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); return e; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Jan 27 01:44:02 2009 @@ -23,10 +23,7 @@ */ #include "types.h" -#include "Cpg.h" -#include "Connection.h" #include "qpid/RefCountedBuffer.h" -#include "qpid/framing/Buffer.h" #include "qpid/sys/LatencyMetric.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -37,6 +34,7 @@ namespace framing { class AMQBody; +class Buffer; } namespace cluster { @@ -52,6 +50,8 @@ ConnectionId getConnectionId() const { return connectionId; } MemberId getMemberId() const { return connectionId.getMember(); } size_t getSize() const { return size; } + uint64_t getSequence() const { return sequence; } + void setSequence(uint64_t n) { sequence = n; } bool isCluster() const { return connectionId.getPointer() == 0; } bool isConnection() const { return connectionId.getPointer() != 0; } @@ -62,6 +62,7 @@ EventType type; ConnectionId connectionId; size_t size; + uint64_t sequence; }; /** Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Tue Jan 27 01:44:02 2009 @@ -23,6 +23,7 @@ */ #include "types.h" +#include "Event.h" #include "qpid/framing/AMQFrame.h" #include "qpid/sys/LatencyMetric.h" #include <boost/intrusive_ptr.hpp> @@ -37,19 +38,30 @@ */ struct EventFrame { + EventFrame() : sequence(0) {} // Connection event frame - EventFrame(const boost::intrusive_ptr<Connection>& c, const MemberId& m, const framing::AMQFrame& f, int rc=0) - : connection(c), member(m), frame(f), readCredit(rc) { + EventFrame(const boost::intrusive_ptr<Connection>& c, const Event& e, const framing::AMQFrame& f, int rc=0) + : connection(c), member(e.getMemberId()), frame(f), sequence(e.getSequence()), readCredit(rc) + { QPID_LATENCY_INIT(frame); } bool isCluster() const { return !connection; } bool isConnection() const { return connection; } + bool isLastInEvent() const { return readCredit; } + + // True if this frame follows immediately after frame e. + bool follows(const EventFrame& e) const { + return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit); + } + + bool operator<(const EventFrame& e) const { return sequence < e.sequence; } boost::intrusive_ptr<Connection> connection; MemberId member; framing::AMQFrame frame; - int readCredit; // restore this much read credit when frame is processed + uint64_t sequence; + int readCredit; // last frame in an event, give credit when processed. }; }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Jan 27 01:44:02 2009 @@ -21,7 +21,6 @@ #include "Multicaster.h" #include "Cpg.h" -#include "ClusterLeaveException.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Jan 27 01:44:02 2009 @@ -22,8 +22,6 @@ * */ - -#include "ClusterLeaveException.h" #include "config.h" #include "qpid/Url.h" Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=737968&r1=737967&r2=737968&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Tue Jan 27 01:44:02 2009 @@ -46,6 +46,7 @@ class PollableQueue { public: typedef std::deque<T> Queue; + typedef T value_type; /** * Callback to process a batch of items from the queue. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org