Author: aconway Date: Wed Feb 11 17:29:42 2009 New Revision: 743416 URL: http://svn.apache.org/viewvc?rev=743416&view=rev Log: Fix race condition with read-credit.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=743416&r1=743415&r2=743416&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Feb 11 17:29:42 2009 @@ -210,6 +210,7 @@ remainingSize = size - pi.encodedSize(); } else { QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link"); + giveReadCredit(1); // We're not going to mcast so give read credit now. return 0; } } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=743416&r1=743415&r2=743416&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Wed Feb 11 17:29:42 2009 @@ -66,6 +66,7 @@ ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) { if (id.getMember() != cluster.getId()) return 0; Map::const_iterator i = map.find(id); + assert(i != map.end()); // FIXME aconway 2009-02-11: remove or exception. return i == map.end() ? 0 : i->second; } Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=743416&r1=743415&r2=743416&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Wed Feb 11 17:29:42 2009 @@ -139,6 +139,7 @@ virtual void queueWriteClose() = 0; virtual bool writeQueueEmpty() = 0; virtual void startReading() = 0; + virtual void stopReading() = 0; virtual void requestCallback(RequestCallback) = 0; virtual BufferBase* getQueuedBuffer() = 0; Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=743416&r1=743415&r2=743416&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Wed Feb 11 17:29:42 2009 @@ -145,8 +145,10 @@ // Lock and retest credit to make sure we don't race with increasing credit ScopedLock<Mutex> l(creditLock); assert(readCredit.get() >= 0); - if (readCredit.get() == 0) + if (readCredit.get() == 0) { + aio->stopReading(); return false; + } } } return true; Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=743416&r1=743415&r2=743416&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Wed Feb 11 17:29:42 2009 @@ -267,6 +267,7 @@ virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void stopReading(); virtual void requestCallback(RequestCallback); virtual BufferBase* getQueuedBuffer(); @@ -389,6 +390,10 @@ DispatchHandle::rewatchRead(); } +void AsynchIO::stopReading() { + DispatchHandle::unwatchRead(); +} + void AsynchIO::requestCallback(RequestCallback callback) { // TODO creating a function object every time isn't all that // efficient - if this becomes heavily used do something better (what?) @@ -439,8 +444,7 @@ readTotal += rc; if (!readCallback(*this, buff)) { - // We were told to flow control reading at this point - h.unwatchRead(); + // We have been flow controlled. break; } Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=743416&r1=743415&r2=743416&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Wed Feb 11 17:29:42 2009 @@ -286,6 +286,7 @@ virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void stopReading(); virtual void requestCallback(RequestCallback); /** @@ -534,6 +535,15 @@ return; } +// stopReading was added to prevent a race condition with read-credit on Linux. +// It may or may not be required on windows. +// +// AsynchIOHandler::readbuff() calls stopReading() inside the same +// critical section that protects startReading() in +// AsynchIOHandler::giveReadCredit(). +// +void AsynchIO::stopReading() {} + // Queue the specified callback for invocation from an I/O thread. void AsynchIO::requestCallback(RequestCallback callback) { // This method is generally called from a processing thread; transfer --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org