Author: gsim
Date: Tue Jun  1 18:59:52 2010
New Revision: 950205

URL: http://svn.apache.org/viewvc?rev=950205&view=rev
Log:
QPID-2004: Send disconnected event to any handles still registered after 
shutdown to ensure they can clean themselves up

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=950205&r1=950204&r2=950205&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Tue Jun  1 18:59:52 
2010
@@ -60,8 +60,10 @@ void PollerDispatch::dispatch(sys::Dispa
 
 // Entry point: called if disconnected from  CPG.
 void PollerDispatch::disconnect(sys::DispatchHandle& ) {
-    QPID_LOG(critical, "Disconnected from cluster");
-    onError();
+    if (!poller->hasShutdown()) {
+        QPID_LOG(critical, "Disconnected from cluster");
+        onError();
+    }
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=950205&r1=950204&r2=950205&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Tue Jun  1 18:59:52 2010
@@ -99,6 +99,8 @@ public:
     QPID_COMMON_EXTERN void monitorHandle(PollerHandle& handle, Direction dir);
     QPID_COMMON_EXTERN void unmonitorHandle(PollerHandle& handle, Direction 
dir);
     QPID_COMMON_EXTERN Event wait(Duration timeout = TIME_INFINITE);
+
+    QPID_COMMON_EXTERN bool hasShutdown();
 };
 
 /**

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=950205&r1=950204&r2=950205&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Jun  1 18:59:52 
2010
@@ -22,6 +22,7 @@
 #include "qpid/sys/Poller.h"
 #include "qpid/sys/IOHandle.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicCount.h"
 #include "qpid/sys/DeletionManager.h"
 #include "qpid/sys/posix/check.h"
 #include "qpid/sys/posix/PrivatePosix.h"
@@ -33,6 +34,7 @@
 
 #include <assert.h>
 #include <queue>
+#include <set>
 #include <exception>
 
 namespace qpid {
@@ -156,6 +158,37 @@ PollerHandle::~PollerHandle() {
     PollerHandleDeletionManager.markForDeletion(impl);
 }
 
+class HandleSet
+{
+    Mutex lock;
+    std::set<PollerHandle*> handles;
+  public:
+    void add(PollerHandle*);
+    void remove(PollerHandle*);
+    void cleanup();
+};
+
+void HandleSet::add(PollerHandle* h)
+{
+    ScopedLock<Mutex> l(lock);
+    handles.insert(h);
+}
+void HandleSet::remove(PollerHandle* h)
+{
+    ScopedLock<Mutex> l(lock);
+    handles.erase(h);
+}
+void HandleSet::cleanup()
+{
+    // Inform all registered handles of disconnection
+    std::set<PollerHandle*> copy;
+    handles.swap(copy);
+    for (std::set<PollerHandle*>::const_iterator i = copy.begin(); i != 
copy.end(); ++i) {
+        Poller::Event event(*i, Poller::DISCONNECTED);
+        event.process();
+    }
+}
+
 /**
  * Concrete implementation of Poller to use the Linux specific epoll
  * interface
@@ -230,6 +263,8 @@ class PollerPrivate {
     bool isShutdown;
     InterruptHandle interruptHandle;
     ::sigset_t sigMask;
+    HandleSet registeredHandles;
+    AtomicCount threadCount;
 
     static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
         switch (dir) {
@@ -308,6 +343,7 @@ void Poller::registerHandle(PollerHandle
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
+    impl->registeredHandles.add(&handle);
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
 
     eh.setActive();
@@ -318,6 +354,7 @@ void Poller::unregisterHandle(PollerHand
     ScopedLock<Mutex> l(eh.lock);
     assert(!eh.isIdle());
 
+    impl->registeredHandles.remove(&handle);
     int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
     // Ignore EBADF since deleting a nonexistent fd has the overall required 
result!
     // And allows the case where a sloppy program closes the fd and then does 
the delFd()
@@ -475,6 +512,7 @@ void Poller::run() {
         ::sigfillset(&ss);
         ::pthread_sigmask(SIG_SETMASK, &ss, 0);
 
+        ++(impl->threadCount);
         do {
             Event event = wait();
 
@@ -486,6 +524,8 @@ void Poller::run() {
                 switch (event.type) {
                 case SHUTDOWN:
                     PollerHandleDeletionManager.destroyThreadState();
+                    //last thread to respond to shutdown cleans up:
+                    if (--(impl->threadCount) == 0) 
impl->registeredHandles.cleanup();
                     return;
                 default:
                     // This should be impossible
@@ -497,6 +537,12 @@ void Poller::run() {
         QPID_LOG(error, "IO worker thread exiting with unhandled exception: " 
<< e.what());
     }
     PollerHandleDeletionManager.destroyThreadState();
+    --(impl->threadCount);
+}
+
+bool Poller::hasShutdown()
+{
+    return impl->isShutdown;
 }
 
 Poller::Event Poller::wait(Duration timeout) {

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp?rev=950205&r1=950204&r2=950205&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp Tue Jun  1 18:59:52 
2010
@@ -293,6 +293,11 @@ void Poller::shutdown() {
     impl->interrupt();
 }
 
+bool Poller::hasShutdown()
+{
+    return impl->isShutdown;
+}
+
 bool Poller::interrupt(PollerHandle& handle) {
     PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
     PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=950205&r1=950204&r2=950205&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Tue Jun  1 18:59:52 
2010
@@ -100,6 +100,11 @@ void Poller::shutdown() {
     PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
 }
 
+bool Poller::hasShutdown()
+{
+    return impl->isShutdown;
+}
+
 bool Poller::interrupt(PollerHandle&) {
     return false;  // There's no concept of a registered handle.
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to