Author: aconway
Date: Tue Feb 24 19:48:54 2009
New Revision: 747528
URL: http://svn.apache.org/viewvc?rev=747528&view=rev
Log:
Fixed issue with producer flow control in a cluster.
Producer flow control uses a Timer and other clock-based calculations to send
flow control commands.
These commands are not predictably ordered from the clusters point of view.
Added getClusterOrderProxy() to SessionState. In a cluster it returns
a proxy that defers sending a command to the client until it is
multicast to the cluster. In a stand alone broker it is just the
normal proxy. Updated producer flow control to use this proxy.
Cluster flow control is turned off in shadow connections. Only the
directly connected node does flow control calculations and multicasts
the commands to send. All nodes sending of the commands thru SessionState
to ensure consistent session state (e.g. command numbering.)
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h (with props)
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
qpid/trunk/qpid/cpp/src/tests/ssl_test
qpid/trunk/qpid/cpp/src/tests/start_cluster
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Feb 24 19:48:54 2009
@@ -70,6 +70,7 @@
qpid/cluster/FailoverExchange.h \
qpid/cluster/Multicaster.cpp \
qpid/cluster/Multicaster.h \
+ qpid/cluster/MulticastFrameHandler.h \
qpid/cluster/NoOpConnectionOutputHandler.h \
qpid/cluster/OutputInterceptor.cpp \
qpid/cluster/OutputInterceptor.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Tue Feb 24 19:48:54
2009
@@ -48,8 +48,9 @@
heartbeatmax(120),
stagingThreshold(broker.getStagingThreshold()),
federationLink(true),
- clientSupportsThrottling(false)
- {}
+ clientSupportsThrottling(false),
+ clusterOrderOut(0)
+ {}
virtual ~ConnectionState () {}
@@ -75,7 +76,7 @@
const string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
- void setClientThrottling() { clientSupportsThrottling = true; }
+ void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
bool getClientThrottling() const { return clientSupportsThrottling; }
Broker& getBroker() { return broker; }
@@ -86,11 +87,20 @@
//contained output tasks
sys::AggregateOutput outputTasks;
- sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
+ sys::ConnectionOutputHandler& getOutput() { return out; }
framing::ProtocolVersion getVersion() const { return version; }
-
void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o);
}
+ /**
+ * If the broker is part of a cluster, this is a handler provided
+ * by cluster code. It ensures consistent ordering of commands
+ * that are sent based on criteria that are not predictably
+ * ordered cluster-wide, e.g. a timer firing.
+ */
+ framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
+ void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut =
&fh; }
+
+
protected:
framing::ProtocolVersion version;
uint32_t framemax;
@@ -103,6 +113,7 @@
string federationPeerTag;
std::vector<Url> knownHosts;
bool clientSupportsThrottling;
+ framing::FrameHandler* clusterOrderOut;
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Feb 24 19:48:54
2009
@@ -34,7 +34,8 @@
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
+ proxy(out),
+ clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch,
c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Feb 24 19:48:54
2009
@@ -54,6 +54,17 @@
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they
don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will
take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy() {
+ return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
+ }
+
virtual void handleDetach();
// Overrides
@@ -69,9 +80,16 @@
virtual void readyToSend();
private:
+ struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that
sets the channel.
+ framing::ChannelHandler setChannel;
+ SetChannelProxy(uint16_t ch, framing::FrameHandler* out)
+ : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
+ };
+
Connection& connection;
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
+ std::auto_ptr<SetChannelProxy> clusterOrderProxy;
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Feb 24 19:48:54
2009
@@ -66,7 +66,7 @@
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
} else {
QPID_LOG(warning, getId() << ": Unable to flow control client -
client doesn't support");
}
@@ -210,7 +210,6 @@
{}
void fire() {
- QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway
2009-02-23: REMOVE
// This is the best we can currently do to avoid a destruction/fire
race
if (!isCancelled()) {
if ( !sessionState.processSendCredit(0) ) {
@@ -275,7 +274,8 @@
if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
QPID_LOG(warning, getId() << ": producer throttling violation");
// TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().stop("");
return true;
}
AbsTime now = AbsTime::now();
@@ -283,7 +283,7 @@
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
if ( sendCredit>0 ) {
QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
+ getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
rateFlowcontrol->sentCredit(now, sendCredit);
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
@@ -364,8 +364,9 @@
// Issue initial credit - use a heuristic here issue min of 300
messages or 1 secs worth
uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " <<
credit);
- getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, credit);
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().setFlowMode("", 0);
+ getClusterOrderProxy().getMessage().flow("", 0, credit);
rateFlowcontrol->sentCredit(AbsTime::now(), credit);
if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
@@ -373,4 +374,8 @@
Broker& SessionState::getBroker() { return broker; }
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+ return handler->getClusterOrderProxy();
+}
+
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Feb 24 19:48:54 2009
@@ -125,6 +125,15 @@
void sendAcceptAndCompletion();
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they
don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will
take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy();
+
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
@@ -138,7 +147,7 @@
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
- RateFlowcontrol* rateFlowcontrol;
+ boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
friend class SessionManager;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Feb 24 19:48:54 2009
@@ -62,7 +62,8 @@
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId),
expectProtocolHeader(false)
+ connection(&output, cluster.getBroker(), wrappedId),
expectProtocolHeader(false),
+ mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
// Local connections
@@ -70,15 +71,20 @@
const std::string& wrappedId, MemberId myId, bool
isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ?
++catchUpId : 0),
- expectProtocolHeader(isLink)
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(),
self)
{ init(); }
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ connection.setClusterOrderOutput(mcastFrameHandler); // Actively send
cluster-order frames from local node
cluster.addLocalConnection(this);
giveReadCredit(cluster.getReadMax());
}
+ else { // Shadow or
catch-up connection
+ connection.setClusterOrderOutput(nullFrameHandler); // Passive,
discard cluster-order frames
+ connection.setClientThrottling(false); // Disable client
throttling, done by active node.
+ }
}
void Connection::giveReadCredit(int credit) {
@@ -143,7 +149,15 @@
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection
contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker
connection.
+ // FIXME aconway 2009-02-24: Using the DATA/CONTROL
+ // distinction to distinguish incoming vs. outgoing frames is
+ // very unclear.
+ if (f.type == DATA) // incoming data frames to broker::Connection
+ connection.received(const_cast<AMQFrame&>(f.frame));
+ else { // outgoing data frame, send via SessionState
+ broker::SessionState* ss =
connection.getChannel(f.frame.getChannel()).getSession();
+ if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+ }
}
giveReadCredit(f.readCredit);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Feb 24 19:48:54 2009
@@ -27,6 +27,7 @@
#include "OutputInterceptor.h"
#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
+#include "McastFrameHandler.h"
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
@@ -150,6 +151,10 @@
void giveReadCredit(int credit);
private:
+ struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+ };
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
@@ -174,6 +179,8 @@
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
+ McastFrameHandler mcastFrameHandler;
+ NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Feb 24 19:48:54 2009
@@ -74,14 +74,17 @@
return e;
}
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
- framing::AMQFrame f(body);
+Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) {
Event e(CONTROL, cid, f.encodedSize());
Buffer buf(e);
f.encode(buf);
return e;
}
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
+ return control(framing::AMQFrame(body), cid);
+}
+
iovec Event::toIovec() {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
@@ -110,10 +113,13 @@
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+std::ostream& operator << (std::ostream& o, EventType t) {
+ return o << EVENT_TYPE_NAMES[t];
+}
+
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
o << "[event " << e.getConnectionId() << "/" << e.getSequence()
- << " " << EVENT_TYPE_NAMES[e.getType()]
- << " " << e.getSize() << " bytes]";
+ << " " << e.getType() << " " << e.getSize() << " bytes]";
return o;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Feb 24 19:48:54 2009
@@ -34,6 +34,7 @@
namespace framing {
class AMQBody;
+class AMQFrame;
class Buffer;
}
@@ -83,8 +84,11 @@
/** Create an event copied from delivered data. */
static Event decodeCopy(const MemberId& m, framing::Buffer&);
- /** Create an event containing a control */
+ /** Create a control event. */
static Event control(const framing::AMQBody&, const ConnectionId&);
+
+ /** Create a control event. */
+ static Event control(const framing::AMQFrame&, const ConnectionId&);
// Data excluding header.
char* getData() { return store + HEADER_SIZE; }
@@ -105,6 +109,7 @@
};
std::ostream& operator << (std::ostream&, const EventHeader&);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_EVENT_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Tue Feb 24 19:48:54 2009
@@ -27,13 +27,13 @@
EventFrame::EventFrame() : sequence(0) {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int
rc)
- : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()),
readCredit(rc)
+ : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()),
readCredit(rc), type(e.getType())
{
QPID_LATENCY_INIT(frame);
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.connectionId << "/" << e.sequence << " " << e.frame << "
rc=" << e.readCredit;
+ return o << e.connectionId << "/" << e.sequence << " " << e.frame << "
rc=" << e.readCredit << " type=" << e.type;
}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Tue Feb 24 19:48:54 2009
@@ -57,7 +57,8 @@
ConnectionId connectionId;
framing::AMQFrame frame;
uint64_t sequence;
- int readCredit; // last frame in an event, give credit when
processed.
+ int readCredit; ///< last frame in an event, give credit when processed.
+ EventType type;
};
std::ostream& operator<<(std::ostream& o, const EventFrame& e);
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h?rev=747528&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h Tue Feb 24
19:48:54 2009
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_MCASTFRAMEHANDLER_H
+#define QPID_CLUSTER_MCASTFRAMEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/FrameHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A frame handler that multicasts frames as CONTROL events.
+ */
+class McastFrameHandler : public framing::FrameHandler
+{
+ public:
+ McastFrameHandler(Multicaster& m, const ConnectionId& cid) : mcast(m),
connection(cid) {}
+ void handle(framing::AMQFrame& frame) { mcast.mcastControl(frame,
connection); }
+ private:
+ Multicaster& mcast;
+ ConnectionId connection;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MCASTFRAMEHANDLER_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Feb 24 19:48:54
2009
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace cluster {
@@ -43,6 +44,11 @@
mcast(Event::control(body, id));
}
+void Multicaster::mcastControl(const framing::AMQFrame& frame, const
ConnectionId& id) {
+ QPID_LOG(trace, "MCAST " << id << ": " << frame);
+ mcast(Event::control(frame, id));
+}
+
void Multicaster::mcastBuffer(const char* data, size_t size, const
ConnectionId& id) {
Event e(DATA, id, size);
memcpy(e.getData(), data, size);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Feb 24 19:48:54 2009
@@ -50,6 +50,7 @@
boost::function<void()> onError
);
void mcastControl(const framing::AMQBody& controlBody, const
ConnectionId&);
+ void mcastControl(const framing::AMQFrame& controlFrame, const
ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
/** End holding mode, held events are mcast */
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Feb 24 19:48:54 2009
@@ -78,6 +78,8 @@
std::ostream& operator<<(std::ostream&, const ConnectionId&);
+std::ostream& operator << (std::ostream&, EventType);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_TYPES_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Tue Feb 24
19:48:54 2009
@@ -30,7 +30,7 @@
/**
* A ConnectionOutputHandler that delegates to another
* ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler
- * to be changed modified without updating all the pointers/references
+ * to be changed without updating all the pointers/references
* using the ConnectionOutputHandlerPtr
*/
class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
Modified: qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federated_cluster_test?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federated_cluster_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/federated_cluster_test Tue Feb 24 19:48:54
2009
@@ -22,7 +22,7 @@
# Test reliability of the replication feature in the face of link
# failures:
srcdir=`dirname $0`
-PYTHON_DIR=${srcdir}/../../../python
+PYTHON_DIR=$srcdir/../../../python
trap stop_brokers EXIT
@@ -37,7 +37,7 @@
unset BROKER_A
fi
if [[ $NODE_1 ]] ; then
- ./stop_cluster
+ $srcdir/stop_cluster
unset NODE_1
fi
}
Modified: qpid/trunk/qpid/cpp/src/tests/ssl_test
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ssl_test?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ssl_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/ssl_test Tue Feb 24 19:48:54 2009
@@ -39,8 +39,7 @@
}
start_broker() {
- ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir
--no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so
--ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
- PORT=`cat qpidd.port`
+ PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0
--no-data-dir --no-module-dir --auth no --config $CONFIG --load-module
../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE`
}
stop_broker() {
Modified: qpid/trunk/qpid/cpp/src/tests/start_cluster
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ qpid/trunk/qpid/cpp/src/tests/start_cluster Tue Feb 24 19:48:54 2009
@@ -32,7 +32,7 @@
rm -f cluster*.log
SIZE=${1:-1}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so
--cluster-name=$CLUSTER --no-data-dir --auth=no $*"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so
--cluster-name=$CLUSTER --no-data-dir --auth=no $@"
for (( i=0; i<SIZE; ++i )); do
PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` ||
exit 1
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]