Author: shuston
Date: Sat May  5 20:23:48 2012
New Revision: 1334484

URL: http://svn.apache.org/viewvc?rev=1334484&view=rev
Log:
Merge in QPID-3759 from trunk and fixes QPID-3982.

Modified:
    
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
    
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp

Modified: 
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1334484&r1=1334483&r2=1334484&view=diff
==============================================================================
--- 
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp 
(original)
+++ 
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp 
Sat May  5 20:23:48 2012
@@ -46,16 +46,13 @@ namespace {
 
 /*
  * The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
  */
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
-    SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+    SOCKET h = toSocketHandle(s);
     GUID guidAcceptEx = WSAID_ACCEPTEX;
     DWORD dwBytes = 0;
+    LPFN_ACCEPTEX fnAcceptEx;
     WSAIoctl(h,
              SIO_GET_EXTENSION_FUNCTION_POINTER,
              &guidAcceptEx,
@@ -65,9 +62,9 @@ void lookUpAcceptEx() {
              &dwBytes,
              NULL,
              NULL);
-    closesocket(h);
     if (fnAcceptEx == 0)
         throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+    return fnAcceptEx;
 }
 
 }
@@ -94,18 +91,15 @@ private:
 
     AsynchAcceptor::Callback acceptedCallback;
     const Socket& socket;
+    const LPFN_ACCEPTEX fnAcceptEx;
 };
 
 AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
   : acceptedCallback(callback),
-    socket(s) {
+    socket(s),
+    fnAcceptEx(lookUpAcceptEx(s)) {
 
     s.setNonblocking();
-#if (BOOST_VERSION >= 103500)   /* boost 1.35 or later reversed the args */
-    boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
-    boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
 }
 
 AsynchAcceptor::~AsynchAcceptor()
@@ -114,7 +108,8 @@ AsynchAcceptor::~AsynchAcceptor()
 }
 
 void AsynchAcceptor::start(Poller::shared_ptr poller) {
-    poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+    PollerHandle ph = PollerHandle(socket);
+    poller->monitorHandle(ph, Poller::INPUT);
     restart ();
 }
 
@@ -124,14 +119,14 @@ void AsynchAcceptor::restart(void) {
                                                         this,
                                                         
toSocketHandle(socket));
     BOOL status;
-    status = ::fnAcceptEx(toSocketHandle(socket),
-                          toSocketHandle(*result->newSocket),
-                          result->addressBuffer,
-                          0,
-                          AsynchAcceptResult::SOCKADDRMAXLEN,
-                          AsynchAcceptResult::SOCKADDRMAXLEN,
-                          &bytesReceived,
-                          result->overlapped());
+    status = fnAcceptEx(toSocketHandle(socket),
+                        toSocketHandle(*result->newSocket),
+                        result->addressBuffer,
+                        0,
+                        AsynchAcceptResult::SOCKADDRMAXLEN,
+                        AsynchAcceptResult::SOCKADDRMAXLEN,
+                        &bytesReceived,
+                        result->overlapped());
     QPID_WINDOWS_CHECK_ASYNC_START(status);
 }
 
@@ -154,7 +149,7 @@ void AsynchAcceptResult::success(size_t 
     delete this;
 }
 
-void AsynchAcceptResult::failure(int status) {
+void AsynchAcceptResult::failure(int /*status*/) {
     //if (status != WSA_OPERATION_ABORTED)
     // Can there be anything else?  ;
     delete this;
@@ -177,7 +172,7 @@ private:
 
 public:
     AsynchConnector(const Socket& socket,
-                    std::string hostname,
+                    std::string& hostname,
                     uint16_t port,
                     ConnectedCallback connCb,
                     FailedCallback failCb = 0);
@@ -185,7 +180,7 @@ public:
 };
 
 AsynchConnector::AsynchConnector(const Socket& sock,
-                                 std::string hname,
+                                 std::string& hname,
                                  uint16_t p,
                                  ConnectedCallback connCb,
                                  FailedCallback failCb) :
@@ -294,6 +289,8 @@ private:
     volatile LONG opsInProgress;
     // Is there a write in progress?
     volatile bool writeInProgress;
+    // Or a read?
+    volatile bool readInProgress;
     // Deletion requested, but there are callbacks in progress.
     volatile bool queuedDelete;
     // Socket close requested, but there are operations in progress.
@@ -347,6 +344,11 @@ private:
      * Called when there's a completion to process.
      */
     void completion(AsynchIoResult *result);
+
+    /**
+     * Helper function to facilitate the close operation
+     */
+    void cancelRead();
 };
 
 // This is used to encapsulate pure callbacks into a handle
@@ -375,6 +377,7 @@ AsynchIO::AsynchIO(const Socket& s,
     socket(s),
     opsInProgress(0),
     writeInProgress(false),
+    readInProgress(false),
     queuedDelete(false),
     queuedClose(false),
     working(false) {
@@ -392,26 +395,30 @@ AsynchIO::~AsynchIO() {
 }
 
 void AsynchIO::queueForDeletion() {
-    queuedDelete = true;
-    if (opsInProgress > 0) {
-        QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
-        // AsynchIOHandler calls this then deletes itself; don't do any more
-        // callbacks.
-        readCallback = 0;
-        eofCallback = 0;
-        disCallback = 0;
-        closedCallback = 0;
-        emptyCallback = 0;
-        idleCallback = 0;
-    }
-    else {
-        delete this;
+    {
+        ScopedLock<Mutex> l(completionLock);
+        assert(!queuedDelete);
+        queuedDelete = true;
+        if (working || opsInProgress > 0) {
+            QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+            // AsynchIOHandler calls this then deletes itself; don't do any 
more
+            // callbacks.
+            readCallback = 0;
+            eofCallback = 0;
+            disCallback = 0;
+            closedCallback = 0;
+            emptyCallback = 0;
+            idleCallback = 0;
+            return;
+        }
     }
+    delete this;
 }
 
 void AsynchIO::start(Poller::shared_ptr poller0) {
+    PollerHandle ph = PollerHandle(socket);
     poller = poller0;
-    poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+    poller->monitorHandle(ph, Poller::INPUT);
     if (writeQueue.size() > 0)  // Already have data queued for write
         notifyPendingWrite();
     startReading();
@@ -453,9 +460,14 @@ void AsynchIO::notifyPendingWrite() {
 }
 
 void AsynchIO::queueWriteClose() {
-    queuedClose = true;
-    if (!writeInProgress)
-        notifyPendingWrite();
+    {
+        ScopedLock<Mutex> l(completionLock);
+        queuedClose = true;
+        if (working || writeInProgress)
+            // no need to summon an IO thread
+            return;
+    }
+    notifyPendingWrite();
 }
 
 bool AsynchIO::writeQueueEmpty() {
@@ -468,7 +480,7 @@ bool AsynchIO::writeQueueEmpty() {
  * called when the read is complete and data is available.
  */
 void AsynchIO::startReading() {
-    if (queuedDelete)
+    if (queuedDelete || queuedClose)
         return;
 
     // (Try to) get a buffer; look on the front since there may be an
@@ -491,6 +503,7 @@ void AsynchIO::startReading() {
                                  readCount);
         DWORD bytesReceived = 0, flags = 0;
         InterlockedIncrement(&opsInProgress);
+        readInProgress = true;
         int status = WSARecv(toSocketHandle(socket),
                              const_cast<LPWSABUF>(result->getWSABUF()), 1,
                              &bytesReceived,
@@ -584,7 +597,6 @@ void AsynchIO::notifyIdle(void) {
 void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
     writeInProgress = true;
     InterlockedIncrement(&opsInProgress);
-    int writeCount = buff->byteCount-buff->dataCount;
     AsynchWriteResult *result =
         new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
                               buff,
@@ -619,21 +631,24 @@ void AsynchIO::close(void) {
 void AsynchIO::readComplete(AsynchReadResult *result) {
     int status = result->getStatus();
     size_t bytes = result->getTransferred();
+    readInProgress = false;
     if (status == 0 && bytes > 0) {
-        bool restartRead = true;     // May not if receiver doesn't want more
         if (readCallback)
             readCallback(*this, result->getBuff());
-        if (restartRead)
-            startReading();
+        startReading();
     }
     else {
         // No data read, so put the buffer back. It may be partially filled,
         // so "unread" it back to the front of the queue.
         unread(result->getBuff());
-        if (status == 0)
-            notifyEof();
-        else
+        if (queuedClose) {
+            return; // Expected from cancelRead()
+        }
+        notifyEof();
+        if (status != 0)
+        {
             notifyDisconnect();
+        }
     }
 }
 
@@ -684,6 +699,8 @@ void AsynchIO::writeComplete(AsynchWrite
 }
 
 void AsynchIO::completion(AsynchIoResult *result) {
+    bool closing = false;
+    bool deleting = false;
     {
         ScopedLock<Mutex> l(completionLock);
         if (working) {
@@ -715,6 +732,8 @@ void AsynchIO::completion(AsynchIoResult
                 delete result;
                 result = 0;
                 InterlockedDecrement(&opsInProgress);
+                if (queuedClose && opsInProgress == 1 && readInProgress)
+                    cancelRead();
             }
             // Lock is held again.
             if (completionQueue.empty())
@@ -723,17 +742,40 @@ void AsynchIO::completion(AsynchIoResult
             completionQueue.pop();
         }
         working = false;
+        if (opsInProgress == 0) {
+            closing = queuedClose;
+            deleting = queuedDelete;
+        }
     }
     // Lock released; ok to close if ops are done and close requested.
     // Layer above will call back to queueForDeletion() if it hasn't
     // already been done. If it already has, go ahead and delete.
-    if (opsInProgress == 0) {
-        if (queuedClose)
-            // close() may cause a delete; don't trust 'this' on return
-            close();
-        else if (queuedDelete)
-            delete this;
-    }
+    if (deleting)
+        delete this;
+    else if (closing)
+        // close() may cause a delete; don't trust 'this' on return
+        close();
+}
+
+/*
+ * NOTE - this method must be called in the same context as other completions,
+ * so that the resulting readComplete, and final AsynchIO::close() is 
serialized
+ * after this method returns.
+ */
+void AsynchIO::cancelRead() {
+    if (queuedDelete)
+        return;                 // socket already deleted
+    else {
+        ScopedLock<Mutex> l(completionLock);;
+        if (!completionQueue.empty())
+            return;             // process it; come back later if necessary
+    }
+    // Cancel outstanding read and force to completion.  Otherwise, on a faulty
+    // physical link, the pending read can remain uncompleted indefinitely.
+    // Draining the pending read will result in the official close (and
+    // notifyClosed).  CancelIoEX() is the natural choice, but not available in
+    // XP, so we make do with closesocket().
+    socket.close();
 }
 
 } // namespace windows

Modified: 
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=1334484&r1=1334483&r2=1334484&view=diff
==============================================================================
--- 
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
 (original)
+++ 
qpid/branches/0.8-release-candidates/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
 Sat May  5 20:23:48 2012
@@ -96,6 +96,7 @@ void Poller::shutdown() {
     // Allow sloppy code to shut us down more than once.
     if (impl->isShutdown)
         return;
+    impl->isShutdown = true;
     ULONG_PTR key = 1;    // Tell wait() it's a shutdown, not I/O
     PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
 }
@@ -110,7 +111,7 @@ bool Poller::interrupt(PollerHandle&) {
 }
 
 void Poller::run() {
-    do {
+    while (!impl->isShutdown) {
         Poller::Event event = this->wait();
 
         // Handle shutdown
@@ -124,7 +125,7 @@ void Poller::run() {
           // This should be impossible
           assert(false);
         }
-    } while (true);
+    }
 }
 
 void Poller::monitorHandle(PollerHandle& handle, Direction dir) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to