Author: aconway
Date: Wed Dec 17 10:05:30 2008
New Revision: 727455
URL: http://svn.apache.org/viewvc?rev=727455&view=rev
Log:
src/qpid/amqp_0_10/Connection.cpp: allow encoding to be concurrent with adding
new frames.
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=727455&r1=727454&r2=727455&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Wed Dec 17 10:05:30
2008
@@ -69,7 +69,11 @@
}
size_t Connection::encode(const char* buffer, size_t size) {
- Mutex::ScopedLock l(frameQueueLock);
+ { // Swap frameQueue data into workQueue to avoid holding lock while we
encode.
+ Mutex::ScopedLock l(frameQueueLock);
+ assert(workQueue.empty());
+ workQueue.swap(frameQueue);
+ }
framing::Buffer out(const_cast<char*>(buffer), size);
if (!isClient && !initialized) {
framing::ProtocolInitiation pi(getVersion());
@@ -78,16 +82,24 @@
QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
size_t frameSize=0;
- while (!frameQueue.empty() &&
((frameSize=frameQueue.front().encodedSize()) <= out.available())) {
- frameQueue.front().encode(out);
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
- frameQueue.pop_front();
- buffered -= frameSize;
- if (frameQueue.empty() && out.available() > 0) connection->doOutput();
+ size_t encoded=0;
+ while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize())
<= out.available())) {
+ workQueue.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front());
+ workQueue.pop_front();
+ encoded += frameSize;
+ if (workQueue.empty() && out.available() > 0) connection->doOutput();
+ }
+ assert(workQueue.empty() || workQueue.front().encodedSize() <= size);
+ if (!workQueue.empty() && workQueue.front().encodedSize() > size)
+ throw InternalErrorException(QPID_MSG("Frame too large for buffer."));
+ {
+ Mutex::ScopedLock l(frameQueueLock);
+ buffered -= encoded;
+ // Put back any frames we did not encode.
+ frameQueue.insert(frameQueue.begin(), workQueue.begin(),
workQueue.end());
+ workQueue.clear();
}
- assert(frameQueue.empty() || frameQueue.front().encodedSize() <= size);
- if (!frameQueue.empty() && frameQueue.front().encodedSize() > size)
- throw InternalErrorException(QPID_MSG("Could not write frame, too
large for buffer."));
return out.getPosition();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=727455&r1=727454&r2=727455&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Wed Dec 17 10:05:30 2008
@@ -45,6 +45,7 @@
typedef std::deque<framing::AMQFrame> FrameQueue;
FrameQueue frameQueue;
+ FrameQueue workQueue;
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;