Author: aconway
Date: Wed Feb 11 17:29:42 2009
New Revision: 743416

URL: http://svn.apache.org/viewvc?rev=743416&view=rev
Log:
Fix race condition with read-credit.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=743416&r1=743415&r2=743416&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Feb 11 17:29:42 2009
@@ -210,6 +210,7 @@
                 remainingSize = size - pi.encodedSize();
             } else {
                 QPID_LOG(debug, "Not enough data for protocol header on 
outgoing clustered link");
+                giveReadCredit(1); // We're not going to mcast so give read 
credit now.
                 return 0;
             }
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=743416&r1=743415&r2=743416&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Wed Feb 11 17:29:42 
2009
@@ -66,6 +66,7 @@
 ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) {
     if (id.getMember() != cluster.getId()) return 0;
     Map::const_iterator i = map.find(id);
+    assert(i != map.end());     // FIXME aconway 2009-02-11: remove or 
exception.
     return i == map.end() ? 0 : i->second;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=743416&r1=743415&r2=743416&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Wed Feb 11 17:29:42 2009
@@ -139,6 +139,7 @@
     virtual void queueWriteClose() = 0;
     virtual bool writeQueueEmpty() = 0;
     virtual void startReading() = 0;
+    virtual void stopReading() = 0;
     virtual void requestCallback(RequestCallback) = 0;
     virtual BufferBase* getQueuedBuffer() = 0;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=743416&r1=743415&r2=743416&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Wed Feb 11 17:29:42 
2009
@@ -145,8 +145,10 @@
             // Lock and retest credit to make sure we don't race with 
increasing credit
             ScopedLock<Mutex> l(creditLock);
             assert(readCredit.get() >= 0);
-            if (readCredit.get() == 0)
+            if (readCredit.get() == 0) {
+                aio->stopReading();
                 return false;
+            }
         }
     }
     return true;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=743416&r1=743415&r2=743416&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Wed Feb 11 17:29:42 2009
@@ -267,6 +267,7 @@
     virtual void queueWriteClose();
     virtual bool writeQueueEmpty();
     virtual void startReading();
+    virtual void stopReading();
     virtual void requestCallback(RequestCallback);
     virtual BufferBase* getQueuedBuffer();
 
@@ -389,6 +390,10 @@
     DispatchHandle::rewatchRead();
 }
 
+void AsynchIO::stopReading() {
+    DispatchHandle::unwatchRead();
+}
+
 void AsynchIO::requestCallback(RequestCallback callback) {
     // TODO creating a function object every time isn't all that
     // efficient - if this becomes heavily used do something better (what?)
@@ -439,8 +444,7 @@
                 readTotal += rc;
 
                 if (!readCallback(*this, buff)) {
-                    // We were told to flow control reading at this point
-                    h.unwatchRead();
+                    // We have been flow controlled.
                     break;
                 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=743416&r1=743415&r2=743416&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Wed Feb 11 17:29:42 
2009
@@ -286,6 +286,7 @@
     virtual void queueWriteClose();
     virtual bool writeQueueEmpty();
     virtual void startReading();
+    virtual void stopReading();
     virtual void requestCallback(RequestCallback);
 
     /**
@@ -534,6 +535,15 @@
     return;
 }
 
+// stopReading was added to prevent a race condition with read-credit on Linux.
+// It may or may not be required on windows.
+// 
+// AsynchIOHandler::readbuff() calls stopReading() inside the same
+// critical section that protects startReading() in
+// AsynchIOHandler::giveReadCredit().
+// 
+void AsynchIO::stopReading() {}
+
 // Queue the specified callback for invocation from an I/O thread.
 void AsynchIO::requestCallback(RequestCallback callback) {
     // This method is generally called from a processing thread; transfer



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to