Author: aconway
Date: Tue Feb 10 21:42:10 2009
New Revision: 743114
URL: http://svn.apache.org/viewvc?rev=743114&view=rev
Log:
Fix cluster flow control bug: hang with large messages (>frame-max) and low
--cluster-read-max.
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h Tue Feb 10 21:42:10
2009
@@ -21,9 +21,9 @@
* under the License.
*
*/
+#include "qpid/sys/IntegerTypes.h"
#include <vector>
#include <iosfwd>
-#include <stdint.h>
namespace qpid {
namespace amqp_0_10 {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Feb 10 21:42:10 2009
@@ -103,7 +103,7 @@
"Error delivering frames",
poller),
connections(*this),
- decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1),
connections),
expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this),
mcast, myId, broker.getTimer())),
frameId(0),
initialized(false),
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Feb 10 21:42:10 2009
@@ -62,14 +62,14 @@
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0),
expectProtocolHeader(false)
+ connection(&output, cluster.getBroker(), wrappedId),
expectProtocolHeader(false)
{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool
isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ?
++catchUpId : 0), readCredit(0),
+ connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ?
++catchUpId : 0),
expectProtocolHeader(isLink)
{ init(); }
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Feb 10 21:42:10 2009
@@ -146,6 +146,8 @@
// Encoded queue/exchange replication.
void queue(const std::string& encoded);
void exchange(const std::string& encoded);
+
+ void giveReadCredit(int credit) { output.giveReadCredit(credit); }
private:
void init();
@@ -171,7 +173,6 @@
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
- int readCredit;
bool expectProtocolHeader;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp Tue Feb 10
21:42:10 2009
@@ -21,28 +21,34 @@
#include "ConnectionDecoder.h"
#include "EventFrame.h"
+#include "ConnectionMap.h"
namespace qpid {
namespace cluster {
using namespace framing;
-ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h),
readCredit(0) {}
+ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {}
-void ConnectionDecoder::decode(const EventHeader& eh, const void* data) {
+void ConnectionDecoder::decode(const EventHeader& eh, const void* data,
ConnectionMap& map) {
assert(eh.getType() == DATA); // Only handle connection data events.
const char* cp = static_cast<const char*>(data);
Buffer buf(const_cast<char*>(cp), eh.getSize());
- // Set read credit on the last frame in the event.
- ++readCredit; // One credit per event = connection read
buffer.
- if (decoder.decode(buf)) { // Decoded a frame
+ if (decoder.decode(buf)) { // Decoded a frame
AMQFrame frame(decoder.frame);
while (decoder.decode(buf)) {
handler(EventFrame(eh, frame));
frame = decoder.frame;
}
- handler(EventFrame(eh, frame, readCredit));
- readCredit = 0; // Reset credit for next event.
+ handler(EventFrame(eh, frame, 1)); // Set read-credit on the last
frame.
+ }
+ else {
+ // We must give 1 unit read credit per event.
+ // This event does not contain any complete frames so
+ // we must give read credit directly.
+ ConnectionPtr connection = map.getLocal(eh.getConnectionId());
+ if (connection)
+ connection->giveReadCredit(1);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h Tue Feb 10
21:42:10 2009
@@ -30,6 +30,8 @@
class EventHeader;
class EventFrame;
+class ConnectionMap;
+
/**
* Decodes delivered connection data Event's as EventFrame's for a
* connection replica, local or shadow. Manages state for frame
@@ -47,12 +49,11 @@
/** Takes EventHeader + data rather than Event so that the caller can
* pass a pointer to connection data or a CPG buffer directly without copy.
*/
- void decode(const EventHeader& eh, const void* data);
+ void decode(const EventHeader& eh, const void* data, ConnectionMap&
connections);
private:
Handler handler;
framing::FrameDecoder decoder;
- int readCredit;
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Tue Feb 10 21:42:10
2009
@@ -62,6 +62,12 @@
return i->second;
}
+ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) {
+ if (id.getMember() != cluster.getId()) return 0;
+ Map::const_iterator i = map.find(id);
+ return i == map.end() ? 0 : i->second;
+}
+
ConnectionMap::Vector ConnectionMap::values() const {
Vector result(map.size());
std::transform(map.begin(), map.end(), result.begin(),
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Tue Feb 10 21:42:10
2009
@@ -60,6 +60,9 @@
*/
ConnectionPtr get(const ConnectionId& id);
+ /** If ID is a local connection and in the map return it, else return 0 */
+ ConnectionPtr getLocal(const ConnectionId& id);
+
/** Get connections for sending an update. */
Vector values() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp Tue Feb 10 21:42:10 2009
@@ -29,12 +29,12 @@
using namespace framing;
-Decoder::Decoder(const Handler& h) : handler(h) {}
+Decoder::Decoder(const Handler& h, ConnectionMap& cm) : handler(h),
connections(cm) {}
void Decoder::decode(const EventHeader& eh, const void* data) {
ConnectionId id = eh.getConnectionId();
std::pair<Map::iterator, bool> ib = map.insert(id, new
ConnectionDecoder(handler));
- ptr_map_ptr(ib.first)->decode(eh, data);
+ ptr_map_ptr(ib.first)->decode(eh, data, connections);
}
void Decoder::erase(const ConnectionId& c) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Tue Feb 10 21:42:10 2009
@@ -30,6 +30,7 @@
namespace cluster {
class EventHeader;
+class ConnectionMap;
/**
* Holds a map of ConnectionDecoders. Decodes Events into EventFrames
@@ -42,7 +43,7 @@
public:
typedef boost::function<void(const EventFrame&)> Handler;
- Decoder(const Handler& h);
+ Decoder(const Handler& h, ConnectionMap&);
/** Takes EventHeader + data rather than Event so that the caller can
* pass a pointer to connection data or a CPG buffer directly without copy.
@@ -56,7 +57,9 @@
typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map;
Handler handler;
Map map;
+ ConnectionMap& connections;
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_DECODER_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Feb 10 21:42:10 2009
@@ -24,11 +24,12 @@
#include "config.h"
#include "qpid/Url.h"
+#include "qpid/sys/IntegerTypes.h"
#include <boost/intrusive_ptr.hpp>
#include <utility>
#include <iosfwd>
#include <string>
-#include <stdint.h>
+
extern "C" {
#if defined (HAVE_OPENAIS_CPG_H)
Modified: qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/latencytest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/latencytest.cpp Tue Feb 10 21:42:10 2009
@@ -223,6 +223,7 @@
if (msgCount) {
std::cout << "Warning: found " << msgCount << " msgs on " << queue <<
". Purging..." << std::endl;
session.queuePurge(arg::queue=queue);
+ session.sync();
}
SubscriptionSettings settings;
if (opts.prefetch) {
@@ -245,10 +246,8 @@
{
++count;
uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
- //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add
support for uint64_t as a field table type
uint64_t receivedAt = current_time();
- //std::cerr << "Latency: " << (receivedAt - sentAt) << std::endl;
stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
if (!opts.rate && count >= opts.count) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]