Author: aconway Date: Wed Jan 21 12:40:38 2009 New Revision: 736409 URL: http://svn.apache.org/viewvc?rev=736409&view=rev Log: cluster: Pipeline decoding. About 10% improvement in latency and throughput.
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (with props) Modified: qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=736409&r1=736408&r2=736409&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Wed Jan 21 12:40:38 2009 @@ -36,29 +36,30 @@ dmodule_LTLIBRARIES += cluster.la -cluster_la_SOURCES = \ - $(CMAN_SOURCES) \ - qpid/cluster/Cluster.cpp \ - qpid/cluster/Cluster.h \ +cluster_la_SOURCES = \ + $(CMAN_SOURCES) \ + qpid/cluster/Cluster.cpp \ + qpid/cluster/Cluster.h \ qpid/cluster/ClusterLeaveException.h \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/ClusterMap.h \ - qpid/cluster/ClusterPlugin.cpp \ - qpid/cluster/Connection.cpp \ + qpid/cluster/ClusterPlugin.cpp \ + qpid/cluster/Connection.cpp \ qpid/cluster/Connection.h \ qpid/cluster/ConnectionCodec.cpp \ qpid/cluster/ConnectionCodec.h \ - qpid/cluster/ConnectionMap.h \ + qpid/cluster/ConnectionMap.h \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ - qpid/cluster/DumpClient.cpp \ + qpid/cluster/DumpClient.cpp \ qpid/cluster/DumpClient.h \ qpid/cluster/Event.cpp \ qpid/cluster/Event.h \ - qpid/cluster/FailoverExchange.cpp \ + qpid/cluster/EventFrame.h \ + qpid/cluster/FailoverExchange.cpp \ qpid/cluster/FailoverExchange.h \ - qpid/cluster/Multicaster.cpp \ + qpid/cluster/Multicaster.cpp \ qpid/cluster/Multicaster.h \ qpid/cluster/NoOpConnectionOutputHandler.h \ qpid/cluster/OutputInterceptor.cpp \ Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=736409&r1=736408&r2=736409&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 21 12:40:38 2009 @@ -96,7 +96,8 @@ writeEstimate(writeEstimate_), mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), + deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller), + deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller), state(INIT), lastSize(0), lastBroker(false) @@ -111,7 +112,8 @@ broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); failoverExchange.reset(new FailoverExchange(this)); dispatcher.start(); - deliverQueue.start(); + deliverEventQueue.start(); + deliverFrameQueue.start(); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (quorum_) quorum.init(); cpg.join(name); @@ -191,14 +193,14 @@ void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; QPID_LOG(trace, *this << " PUSH: " << e); - deliverQueue.push(e); + QPID_LATENCY_INIT(e); + deliverEventQueue.push(e); } -// Entry point: called when deliverQueue has events to process. -void Cluster::delivered(PollableEventQueue::Queue& events) { +// Entry point: called when deliverEventQueue has events to process. +void Cluster::deliveredEvents(PollableEventQueue::Queue& events) { try { - for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i) - deliveredEvent(*i, i->getData()); + for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); events.clear(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); @@ -206,41 +208,52 @@ } } -void Cluster::deliveredEvent(const EventHeader& e, const char* data) { - QPID_LATENCY_RECORD("deliver queue", e); - Buffer buf(const_cast<char*>(data), e.getSize()); - AMQFrame frame; - if (e.isCluster()) { - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); - Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? - ClusterDispatcher dispatch(*this, e.getMemberId(), l); - if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } +void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) { + try { + for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1)); + frames.clear(); + } catch (const std::exception& e) { + QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); + leave(); } - else { // e.isConnection() +} + +void Cluster::deliveredEvent(const Event& e) { + QPID_LATENCY_RECORD("delivered event queue", e); + Buffer buf(const_cast<char*>(e.getData()), e.getSize()); + boost::intrusive_ptr<Connection> connection; + if (e.isConnection()) { if (state == NEWBIE) { QPID_LOG(trace, *this << " DROP: " << e); + return; } - else { - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); - if (!connection) return; - if (e.getType() == CONTROL) { - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); - connection->delivered(frame); - } - } - else { - QPID_LOG(trace, *this << " DLVR: " << e); - connection->deliverBuffer(buf); - } + connection = getConnection(e.getConnectionId()); + if (!connection) return; + } + if (e.getType() == CONTROL) { + AMQFrame frame; + while (frame.decode(buf)) { + deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame)); } } - QPID_LATENCY_RECORD("decode+execute", e); + else if (e.getType() == DATA) { + connection->deliveredEvent(e, deliverFrameQueue); + } } +void Cluster::deliveredFrame(const EventFrame& e) { + QPID_LOG(trace, *this << " DLVR: " << e.frame); + if (e.connection) { + e.connection->deliveredFrame(e); + } + else { + Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? + ClusterDispatcher dispatch(*this, e.member, l); + if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); + } +} + struct AddrList { const cpg_address* addrs; int count; @@ -379,7 +392,7 @@ setClusterId(uuid); state = DUMPEE; QPID_LOG(info, *this << " receiving dump from " << dumper); - deliverQueue.stop(); + deliverEventQueue.stop(); checkDumpIn(l); } } @@ -389,7 +402,7 @@ assert(state == OFFER); state = DUMPER; QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url); - deliverQueue.stop(); + deliverEventQueue.stop(); if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( new DumpClient(myId, dumpee, url, broker, map, connections.values(), @@ -411,7 +424,7 @@ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; QPID_LOG(info, *this << " received dump, starting catch-up"); - deliverQueue.start(); + deliverEventQueue.start(); } } @@ -425,7 +438,7 @@ state = READY; mcast.release(); QPID_LOG(info, *this << " sent dump"); - deliverQueue.start(); + deliverEventQueue.start(); tryMakeOffer(map.firstNewbie(), l); // Try another offer } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=736409&r1=736408&r2=736409&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 21 12:40:38 2009 @@ -25,6 +25,7 @@ #include "Event.h" #include "FailoverExchange.h" #include "Multicaster.h" +#include "EventFrame.h" #include "NoOpConnectionOutputHandler.h" #include "PollerDispatch.h" #include "Quorum.h" @@ -102,7 +103,7 @@ typedef sys::Monitor::ScopedLock Lock; typedef sys::PollableQueue<Event> PollableEventQueue; - typedef std::deque<Event> PlainEventQueue; + typedef sys::PollableQueue<EventFrame> PollableFrameQueue; // NB: The final Lock& parameter on functions below is used to mark functions // that should only be called by a function that already holds the lock. @@ -126,8 +127,10 @@ void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); - void delivered(PollableEventQueue::Queue&); // deliverQueue callback - void deliveredEvent(const EventHeader&, const char*); + void deliveredEvents(PollableEventQueue::Queue&); + void deliveredFrames(PollableFrameQueue::Queue&); + void deliveredEvent(const Event&); + void deliveredFrame(const EventFrame&); // Helper, called in deliver thread. void dumpStart(const MemberId& dumpee, const Url& url, Lock&); @@ -185,7 +188,8 @@ // Thread safe members Multicaster mcast; PollerDispatch dispatcher; - PollableEventQueue deliverQueue; + PollableEventQueue deliverEventQueue; + PollableFrameQueue deliverFrameQueue; ConnectionMap connections; boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=736409&r1=736408&r2=736409&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jan 21 12:40:38 2009 @@ -62,14 +62,14 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), readCredit(0) { init(); } // Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId) + connection(&output, cluster.getBroker(), wrappedId), readCredit(0) { init(); } void Connection::init() { @@ -135,17 +135,35 @@ return !message.empty(); } +// Decode buffer and put frames on frameq. +void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) { + assert(!catchUp); + Buffer buf(e); + // Set read credit on the last frame. + ++readCredit; // One credit per buffer. + if (!mcastDecoder.decode(buf)) return; + AMQFrame frame(mcastDecoder.frame); + while (mcastDecoder.decode(buf)) { + frameq.push(EventFrame(this, getId().getMember(), frame)); + frame = mcastDecoder.frame; + } + frameq.push(EventFrame(this, getId().getMember(), frame, readCredit)); + readCredit = 0; +} + + // Delivered from cluster. -void Connection::delivered(framing::AMQFrame& f) { - QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f); - QPID_LATENCY_INIT(f); +void Connection::deliveredFrame(const EventFrame& f) { + QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame); assert(!catchUp); - currentChannel = f.getChannel(); - if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol. - && !checkUnsupported(*f.getBody())) // Unsupported operation. + currentChannel = f.frame.getChannel(); + if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. + && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - connection.received(f); // Pass to broker connection. + connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection. } + if (cluster.getReadMax() && f.readCredit) + output.giveReadCredit(f.readCredit); } // A local connection is closed by the network layer. @@ -200,15 +218,6 @@ return size; } -void Connection::deliverBuffer(Buffer& buf) { - assert(!catchUp); - ++deliverSeq; - while (mcastDecoder.decode(buf)) - delivered(mcastDecoder.frame); - if (cluster.getReadMax()) - output.giveReadCredit(1); -} - broker::SessionState& Connection::sessionState() { return *connection.getChannel(currentChannel).getSession(); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=736409&r1=736408&r2=736409&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jan 21 12:40:38 2009 @@ -26,6 +26,8 @@ #include "WriteEstimate.h" #include "OutputInterceptor.h" #include "NoOpConnectionOutputHandler.h" +#include "Event.h" +#include "EventFrame.h" #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" @@ -49,6 +51,7 @@ namespace cluster { class Cluster; +class Event; /** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : @@ -58,6 +61,8 @@ { public: + typedef sys::PollableQueue<EventFrame> EventFrameQueue; + /** Local connection, use this in ConnectionId */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp); /** Shadow connection */ @@ -96,8 +101,8 @@ size_t decode(const char* buffer, size_t size); // Called for data delivered from the cluster. - void deliverBuffer(framing::Buffer&); - void delivered(framing::AMQFrame&); + void deliveredEvent(const Event&, EventFrameQueue&); + void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled); @@ -166,6 +171,7 @@ framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; + int readCredit; friend std::ostream& operator<<(std::ostream&, const Connection&); }; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=736409&r1=736408&r2=736409&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Wed Jan 21 12:40:38 2009 @@ -31,12 +31,15 @@ #include <sys/uio.h> // For iovec #include <iosfwd> +#include "types.h" + namespace qpid { -namespace cluster { -// TODO aconway 2008-09-03: more efficient solution for shared -// byte-stream data. -// +namespace framing { +class AMQBody; +} + +namespace cluster { /** Header data for a multicast event */ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { Added: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=736409&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Wed Jan 21 12:40:38 2009 @@ -0,0 +1,53 @@ +#ifndef QPID_CLUSTER_EVENTFRAME_H +#define QPID_CLUSTER_EVENTFRAME_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/framing/AMQFrame.h" +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace cluster { + +class Connection; + +/** + * A frame decoded from an Event. + */ +struct EventFrame +{ + // Connection event frame + EventFrame(const boost::intrusive_ptr<Connection>& c, const MemberId& m, const framing::AMQFrame& f, int rc=0) + : connection(c), member(m), frame(f), readCredit(rc) {} + + bool isCluster() const { return !connection; } + bool isConnection() const { return connection; } + + boost::intrusive_ptr<Connection> connection; + MemberId member; + framing::AMQFrame frame; + int readCredit; // restore this much read credit when frame is processed +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EVENTFRAME_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h ------------------------------------------------------------------------------ svn:keywords = Rev Date --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org