Hi Steve,

I've simplified PollableCondition to get rid of disarm/rearm - I think the requirement for that was just my misunderstanding of the Poller.

Attached patch looks good on linux, and I think will work on Windows - could you take a look at it and test it on windows?

Cheers,
Alan.
>From 454367411f0abc0baafe99aab75411611cf38c92 Mon Sep 17 00:00:00 2001
From: Alan Conway <[email protected]>
Date: Wed, 8 Jul 2009 17:50:53 -0400
Subject: [PATCH] Simplified PollableCondition

---
 qpid/cpp/src/qpid/sys/PollableCondition.h          |   19 +---
 qpid/cpp/src/qpid/sys/PollableQueue.h              |    8 +-
 qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp  |  113 +++++---------------
 .../cpp/src/qpid/sys/windows/PollableCondition.cpp |   13 +--
 qpid/cpp/src/tests/PollableCondition.cpp           |   28 ++----
 5 files changed, 41 insertions(+), 140 deletions(-)

diff --git a/qpid/cpp/src/qpid/sys/PollableCondition.h b/qpid/cpp/src/qpid/sys/PollableCondition.h
index f49fb22..2eb6f2d 100644
--- a/qpid/cpp/src/qpid/sys/PollableCondition.h
+++ b/qpid/cpp/src/qpid/sys/PollableCondition.h
@@ -44,28 +44,13 @@ public:
 
     /**
      * Set the condition. Triggers callback to Callback from Poller.
-     * When callback is made, condition is suspended. Call rearm() to
-     * resume reacting to the condition.
      */
     QPID_COMMON_EXTERN void set();
 
     /**
-     * Get the current state of the condition, then clear it.
-     *
-     * @return The state of the condition before it was cleared.
+     * Clear the condition. Stops callbacks from Poller.
      */
-    QPID_COMMON_EXTERN bool clear();
-
-    /**
-     * Temporarily suspend the ability for the poller to react to the
-     * condition. It can be rearm()ed later.
-     */
-    QPID_COMMON_EXTERN void disarm();
-
-    /**
-     * Reset the ability for the poller to react to the condition.
-     */
-    QPID_COMMON_EXTERN void rearm();
+    QPID_COMMON_EXTERN void clear();
 
  private:
     PollableConditionPrivate *impl;
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h
index 1d390a6..ab84748 100644
--- a/qpid/cpp/src/qpid/sys/PollableQueue.h
+++ b/qpid/cpp/src/qpid/sys/PollableQueue.h
@@ -119,7 +119,6 @@ template <class T> void PollableQueue<T>::start() {
     if (!stopped) return;
     stopped = false;
     if (!queue.empty()) condition.set();
-    condition.rearm();
 }
 
 template <class T> PollableQueue<T>::~PollableQueue() {
@@ -139,7 +138,6 @@ template <class T> void PollableQueue<T>::dispatch(PollableCondition& cond) {
     dispatcher = Thread();
     if (queue.empty()) cond.clear();
     if (stopped) lock.notifyAll();
-    else cond.rearm();
 }
 
 template <class T> void PollableQueue<T>::process() {
@@ -166,11 +164,11 @@ template <class T> void PollableQueue<T>::shutdown() {
 template <class T> void PollableQueue<T>::stop() {
     ScopedLock l(lock);
     if (stopped) return;
-    condition.disarm();
+    condition.clear();
     stopped = true;
     // Avoid deadlock if stop is called from the dispatch thread
-    while (dispatcher.id() && dispatcher.id() != Thread::current().id())
-        lock.wait();
+    if (dispatcher.id() != Thread::current().id()) 
+        while (dispatcher.id()) lock.wait();
 }
 
 }} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
index 0991e5f..b22a615 100644
--- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
@@ -46,8 +46,8 @@ private:
     ~PollableConditionPrivate();
 
     void dispatch(sys::DispatchHandle& h);
-    void rewatch();
-    void unwatch();
+    void set();
+    void clear();
 
 private:
     PollableCondition::Callback cb;
@@ -57,10 +57,11 @@ private:
     std::auto_ptr<DispatchHandleRef> handle;
 };
 
-PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
-                                                   sys::PollableCondition& parent,
-                                                   const boost::shared_ptr<sys::Poller>& poller)
-  : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent)
+PollableConditionPrivate::PollableConditionPrivate(
+    const sys::PollableCondition::Callback& cb,
+    sys::PollableCondition& parent,
+    const boost::shared_ptr<sys::Poller>& poller
+) : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent)
 {
     int fds[2];
     if (::pipe(fds) == -1)
@@ -71,39 +72,41 @@ PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition:
         throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
     if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1)
         throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
-    handle.reset (new DispatchHandleRef(*this,
-                                        boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1),
-                                        0, 0));
+    handle.reset (new DispatchHandleRef(
+                      *this,
+                      boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1),
+                      0, 0));
     handle->startWatch(poller);
     handle->unwatch();
+
+    // Make the read FD readable
+    static const char dummy=0;
+    ssize_t n = ::write(writeFd, &dummy, 1);
+    if (n == -1 && errno != EAGAIN)
+        throw ErrnoException("Error setting PollableCondition");
 }
 
-PollableConditionPrivate::~PollableConditionPrivate()
-{
+PollableConditionPrivate::~PollableConditionPrivate() {
     handle->stopWatch();
     close(writeFd);
 }
 
-void PollableConditionPrivate::dispatch(sys::DispatchHandle& /*h*/)
-{
+void PollableConditionPrivate::dispatch(sys::DispatchHandle&) {
     cb(parent);
 }
 
-void PollableConditionPrivate::rewatch()
-{
+void PollableConditionPrivate::set() {
     handle->rewatch();
 }
 
-void PollableConditionPrivate::unwatch()
-{
+void PollableConditionPrivate::clear() {
     handle->unwatch();
 }
 
-  /* PollableCondition */
 
 PollableCondition::PollableCondition(const Callback& cb,
-                                     const boost::shared_ptr<sys::Poller>& poller)
-  : impl(new PollableConditionPrivate(cb, *this, poller))
+                                     const boost::shared_ptr<sys::Poller>& poller
+) : impl(new PollableConditionPrivate(cb, *this, poller))
 {
 }
 
@@ -112,75 +115,9 @@ PollableCondition::~PollableCondition()
     delete impl;
 }
 
-void PollableCondition::set() {
-    static const char dummy=0;
-    ssize_t n = ::write(impl->writeFd, &dummy, 1);
-    if (n == -1 && errno != EAGAIN)
-        throw ErrnoException("Error setting PollableCondition");
-}
-
-bool PollableCondition::clear() {
-    char buf[256];
-    ssize_t n;
-    bool wasSet = false;
-    while ((n = ::read(impl->impl->fd, buf, sizeof(buf))) > 0) 
-        wasSet = true;
-    if (n == -1 && errno != EAGAIN)
-        throw ErrnoException(QPID_MSG("Error clearing PollableCondition"));
-    return wasSet;
-}
-
-void PollableCondition::disarm() {
-    impl->unwatch();
-}
-
-void PollableCondition::rearm() {
-    impl->rewatch();
-}
-
-
-#if 0
-// FIXME aconway 2008-08-12: More efficient Linux implementation using
-// eventfd system call.  Move to separate file & do configure.ac test
-// to enable this when ::eventfd() is available.
-
-#include <sys/eventfd.h>
+void PollableCondition::set() { impl->set(); }
 
-namespace qpid {
-namespace sys {
-
-PollableConditionPrivate::PollableConditionPrivate(const PollableCondition::Callback& cb,
-                                                   sys::PollableCondition& parent,
-                                                   const boost::shared_ptr<sys::Poller>& poller)
-  : cb(cb), parent(parent), poller(poller),
-    IOHandle(new sys::IOHandlePrivate) {
-    impl->fd = ::eventfd(0, 0);
-    if (impl->fd < 0) throw ErrnoException("conditionfd() failed");
-}
-
-void PollableCondition::set() {
-    static const uint64_t value=1;
-    ssize_t n = ::write(impl->impl->fd,
-                        reinterpret_cast<const void*>(&value), 8);
-    if (n != 8) throw ErrnoException("write failed on conditionfd");
-}
-
-bool PollableCondition::clear() {
-    char buf[8];
-    ssize_t n = ::read(impl->impl->fd, buf, 8);
-    if (n != 8) throw ErrnoException("read failed on conditionfd");
-    return *reinterpret_cast<uint64_t*>(buf);
-}
-
-void PollableCondition::disarm() {
-  // ????
-}
-
-void PollableCondition::rearm() {
-  // ????
-}
-    
-#endif
+void PollableCondition::clear() { impl->clear(); }
 
 }} // namespace qpid::sys
 
diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
index 8291393..2ba9067 100644
--- a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
@@ -107,17 +107,8 @@ void PollableCondition::set() {
     impl->poke();
 }
 
-bool PollableCondition::clear() {
-    return (0 != ::InterlockedExchange(&impl->isSet, 0));
-}
-
-void PollableCondition::disarm() {
-    ::InterlockedExchange(&impl->armed, 0);
-}
-
-void PollableCondition::rearm() {
-    if (0 == ::InterlockedExchange(&impl->armed, 1) && impl->isSet)
-        impl->poke();
+void PollableCondition::clear() {
+    ::InterlockedExchange(&impl->isSet, 0);
 }
 
 }} // namespace qpid::sys
diff --git a/qpid/cpp/src/tests/PollableCondition.cpp b/qpid/cpp/src/tests/PollableCondition.cpp
index 33664d4..4e28aa5 100644
--- a/qpid/cpp/src/tests/PollableCondition.cpp
+++ b/qpid/cpp/src/tests/PollableCondition.cpp
@@ -38,7 +38,7 @@ const Duration LONG = TIME_SEC/10;
 
 class  Callback {
   public:    
-    enum Action { NONE, DISARM, CLEAR, DISARM_CLEAR };
+    enum Action { NONE, CLEAR };
 
     Callback() : count(), action(NONE) {}
 
@@ -47,9 +47,7 @@ class  Callback {
         ++count;
         switch(action) {
           case NONE: break; 
-          case DISARM:  pc.disarm(); break;
           case CLEAR: pc.clear(); break;
-          case DISARM_CLEAR: pc.disarm(); pc.clear(); break;
         }
         action = NONE;
         lock.notify();
@@ -86,27 +84,19 @@ QPID_AUTO_TEST_CASE(testPollableCondition) {
 
     Thread runner = Thread(*poller);
     
-    BOOST_CHECK(callback.isNotCalling()); // condition is not set or armed.
-
-    pc.rearm();                          
-    BOOST_CHECK(callback.isNotCalling()); // Armed but not set
+    BOOST_CHECK(callback.isNotCalling()); // condition is not set.
 
     pc.set();
-    BOOST_CHECK(callback.isCalling()); // Armed and set.
-    BOOST_CHECK(callback.isCalling()); // Still armed and set.
-
-    callback.nextCall(Callback::DISARM);
-    BOOST_CHECK(callback.isNotCalling()); // set but not armed
+    BOOST_CHECK(callback.isCalling()); // Set.
+    BOOST_CHECK(callback.isCalling()); // Still set.
 
-    pc.rearm();
-    BOOST_CHECK(callback.isCalling()); // Armed and set.
-    callback.nextCall(Callback::CLEAR);    
-    BOOST_CHECK(callback.isNotCalling()); // armed but not set
+    callback.nextCall(Callback::CLEAR);
+    BOOST_CHECK(callback.isNotCalling()); // Cleared
 
     pc.set();
-    BOOST_CHECK(callback.isCalling()); // Armed and set.
-    callback.nextCall(Callback::DISARM_CLEAR);    
-    BOOST_CHECK(callback.isNotCalling()); // not armed or set.
+    BOOST_CHECK(callback.isCalling()); // Set.
+    callback.nextCall(Callback::CLEAR);    
+    BOOST_CHECK(callback.isNotCalling()); // Cleared.
 
     poller->shutdown();
     runner.join();
-- 
1.5.5.6


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to