Date: Thursday, November 30, 2006 @ 21:10:28
  Author: gilles
    Path: /cvsroot/carob/carob

Modified: include/ControllerWatcher.hpp (1.1 -> 1.2)
          include/WatchedControllers.hpp (1.1 -> 1.2)
          src/ControllerPool.cpp (1.10 -> 1.11) src/ControllerWatcher.cpp
          (1.4 -> 1.5) src/WatchedControllers.cpp (1.1 -> 1.2)

Fix slow close of connections: detached watcher thread and gave him 
responsability to destroy all its objects


--------------------------------+
 include/ControllerWatcher.hpp  |   20 ++++++++++++++--
 include/WatchedControllers.hpp |    6 ++--
 src/ControllerPool.cpp         |   14 ++++-------
 src/ControllerWatcher.cpp      |   48 +++++++++++++++++++++------------------
 src/WatchedControllers.cpp     |   13 ++++++----
 5 files changed, 61 insertions(+), 40 deletions(-)


Index: carob/include/ControllerWatcher.hpp
diff -u carob/include/ControllerWatcher.hpp:1.1 
carob/include/ControllerWatcher.hpp:1.2
--- carob/include/ControllerWatcher.hpp:1.1     Tue Nov 28 17:07:55 2006
+++ carob/include/ControllerWatcher.hpp Thu Nov 30 21:10:28 2006
@@ -26,6 +26,10 @@
 #include <sys/types.h> // for pthread_t
 #include <vector>
 
+/**
+ * Function to be passed to pthread_create. Launches run on given
+ * ControllerWatcher instance
+ */ 
 extern "C" void *ControllerWatcherThread(void *);
 
 namespace CarobNS {
@@ -43,20 +47,22 @@
  */
 class ControllerWatcher
 {
+  // This function destroy some of our members
+  friend void* ::ControllerWatcherThread(void *);
 public:
   /**
    * Creates a controller watcher with the given controllers to watch.<br>
    * Pings will be sent to all given controllers
    * 
    * @param controllerList controllers to ping
-   * @param callback Callback implementation to call when a controller state
+   * @param callbackPtr Callback implementation to call when a controller state
    *          changes
    * @param pingDelayInMs time to wait between two successive pings
    * @param controllerTimeout delay after which a controller will be considered
    *          as failing if it did not respond to pings
    */
   ControllerWatcher(const std::vector<ControllerInfo>& controllerList,
-      ControllerStateChangedCallback& callback, int pingDelayInMs,
+      ControllerStateChangedCallback* callbackPtr, int pingDelayInMs,
       int controllerTimeout) throw (ConnectionException, UnexpectedException);
 
   ~ControllerWatcher();
@@ -78,6 +84,16 @@
    */
   void                  forceControllerDown(const ControllerInfo& c);
 
+protected:
+  // These accessors are only meant to be used by the thread C function
+  /** Gets the watched controllers to delete */
+  WatchedControllers*   getControllersPtr() { return controllers_ptr; }
+  /** Gets the pinger thread to stop him */
+  pthread_t             getPingerThread() { return pinger_thread; }
+  /** Gets the ControllerPingSender to delete */
+  ControllerPingSender* getControllerPingSenderPtr() { return pinger_ptr; }
+  /** Gets the socket file descriptor to close */
+  int                   getSocketFd() { return socket_fd; }
 private:
   /**
    * Reads data from socket and updates lastTimeSeen values accordingly
Index: carob/include/WatchedControllers.hpp
diff -u carob/include/WatchedControllers.hpp:1.1 
carob/include/WatchedControllers.hpp:1.2
--- carob/include/WatchedControllers.hpp:1.1    Tue Nov 28 17:07:55 2006
+++ carob/include/WatchedControllers.hpp        Thu Nov 30 21:10:28 2006
@@ -80,11 +80,11 @@
    * @param initTime initial lastTimeSeen value, typically <code>clock()</code>
    * @param controllerTimeout delay after which a controller will be considered
    *          as failing if it did not respond to pings
-   * @param callback Callback implementation to call when a controller state
+   * @param cbPtr Callback implementation to call when a controller state
    *          changes
    */
   WatchedControllers(const std::vector<ControllerInfo>& controllersPrm, long 
initTime,
-      int controllerTimeout, ControllerStateChangedCallback& callback);
+      int controllerTimeout, ControllerStateChangedCallback* cbPtr);
 
   /**
    * Adds a controller to the list and associates the given lastTimeSeenValue 
to
@@ -155,7 +155,7 @@
    */
   int                                         controller_timeout;
   /** Actions to take when a controller goes down or back-alive */
-  ControllerStateChangedCallback&             callback;
+  ControllerStateChangedCallback*             callback_ptr;
 };
 } //namespace CarobNS
 #endif /* WATCHED_CONTROLLERS_H_ */
Index: carob/src/ControllerPool.cpp
diff -u carob/src/ControllerPool.cpp:1.10 carob/src/ControllerPool.cpp:1.11
--- carob/src/ControllerPool.cpp:1.10   Thu Nov 30 17:12:23 2006
+++ carob/src/ControllerPool.cpp        Thu Nov 30 21:10:28 2006
@@ -64,9 +64,13 @@
   callback_ptr = new SocketKillerCallback(*this);
   try
   {
-    watcher_ptr = new ControllerWatcher(controllerList, *callback_ptr,
+    // Since the destruction of the watcher can be long (waiting for select
+    // timeout), the property of this watcher instance + the callback instance
+    // is transfered to the ControllerWatcherThread C function
+    watcher_ptr = new ControllerWatcher(controllerList, callback_ptr,
         pingDelayInMs, controllerTimeoutInMs);
     pthread_create(&watcher_thread, NULL, ControllerWatcherThread, 
watcher_ptr);
+    pthread_detach(watcher_thread);
   }
   catch (ConnectionException cause)
   {
@@ -78,14 +82,6 @@
 AbstractControllerPool::~AbstractControllerPool()
 {
   watcher_ptr->stop();
-  int err = pthread_join(watcher_thread, NULL);
-  if (err && isErrorEnabled())
-  {
-    logError(L"AbstractControllerPool::~AbstractControllerPool",
-        L"Failed to join watcher thread");
-  }
-  delete(watcher_ptr);
-  delete(callback_ptr);
   alive_controllers.clear();
 }
 
Index: carob/src/ControllerWatcher.cpp
diff -u carob/src/ControllerWatcher.cpp:1.4 carob/src/ControllerWatcher.cpp:1.5
--- carob/src/ControllerWatcher.cpp:1.4 Thu Nov 30 17:12:23 2006
+++ carob/src/ControllerWatcher.cpp     Thu Nov 30 21:10:28 2006
@@ -42,8 +42,32 @@
 using std::vector;
 using std::wstring;
 
+// extern C thread function that launches the ping
+void* ControllerWatcherThread(void * thisPtr)
+{
+  ControllerWatcher* pt = (ControllerWatcher*)thisPtr;
+  pt->run();
+  
+  // Function has finished, it means that we were stopped, so we have to clean
+  // everything:
+  // stop the pinger thread
+  pt->getControllerPingSenderPtr()->stop();
+  // wait for the pinger to finish
+  pthread_join(pt->getPingerThread(), NULL);
+  // pinger has finished, we can close socket...
+  close(pt->getSocketFd());
+  // ...and delete everything
+  delete(pt->getControllerPingSenderPtr());
+  delete(pt->getControllersPtr());
+  
+  // delete 'this'
+  delete pt;
+  pthread_exit(NULL);
+  return NULL;
+}
+
 ControllerWatcher::ControllerWatcher(const vector<ControllerInfo>& 
controllerList,
-      ControllerStateChangedCallback& callback, int pingDelayInMs, int 
controllerTimeout)
+      ControllerStateChangedCallback* callbackPtr, int pingDelayInMs, int 
controllerTimeout)
       throw (ConnectionException, UnexpectedException) :
       ping_delay_in_ms(pingDelayInMs), controller_timeout(controllerTimeout),
       is_stopped(false)
@@ -52,7 +76,7 @@
   wstring fctName(L"ControllerWatcher::ControllerWatcher");
 
   controllers_ptr = new WatchedControllers(controllerList, 
getCurrentTimeInMs(),
-      controllerTimeout, callback);
+      controllerTimeout, callbackPtr);
 
   // Get the ip family of the first controller, this will be the default one
   socket_fd = socket(controllerList.front().getFamily(), SOCK_DGRAM, 0);
@@ -90,25 +114,7 @@
 
 ControllerWatcher::~ControllerWatcher()
 {
-  // stop the pinger thread
-  pinger_ptr->stop();
-  int err = pthread_join(pinger_thread, NULL);
-  if (err && isErrorEnabled())
-  {
-    logError(L"ControllerWatcher::~ControllerWatcher",
-        L"Failed to join pinger thread");
-  }
-  close(socket_fd);
-  delete(pinger_ptr);
-  delete(controllers_ptr);
-}
-
-void* ControllerWatcherThread(void * thisPtr)
-{
-  ControllerWatcher* pt = (ControllerWatcher*)thisPtr;
-  pt->run();
-  pthread_exit(NULL);
-  return NULL;
+  // Clean-up is done in the C function
 }
 
 void ControllerWatcher::processAnswers()
Index: carob/src/WatchedControllers.cpp
diff -u carob/src/WatchedControllers.cpp:1.1 
carob/src/WatchedControllers.cpp:1.2
--- carob/src/WatchedControllers.cpp:1.1        Tue Nov 28 17:07:55 2006
+++ carob/src/WatchedControllers.cpp    Thu Nov 30 21:10:28 2006
@@ -57,9 +57,9 @@
 CriticalSection WatchedControllers::controllers_CS;
 
 WatchedControllers::WatchedControllers(const vector<ControllerInfo>& 
controllersPrm,
-    long initTime, int controllerTimeout, ControllerStateChangedCallback& cb) :
+    long initTime, int controllerTimeout, ControllerStateChangedCallback* 
cbPtr) :
     controller_timeout(controllerTimeout),
-    callback(cb)
+    callback_ptr(cbPtr)
 {
   // we are inside a contructor, so we can safely operate on the
   // list without worrying of concurrent modifications
@@ -95,7 +95,7 @@
       logInfo(fctName, L"Controller " + static_cast<wstring>(ctrl)
            + L" responds to pings again. Adding it to the pool.");
     state.setUp();
-    callback.onControllerUp(ctrl);
+    callback_ptr->onControllerUp(ctrl);
   }
 }
 
@@ -113,7 +113,7 @@
         logWarn(fctName, L"Controller " + static_cast<wstring>(iter->first)
              + L" does not respond to pings. Considering it as down.");
       iter->second.setDown();
-      callback.onControllerDown(iter->first);
+      callback_ptr->onControllerDown(iter->first);
     }
   }
 }
@@ -124,8 +124,11 @@
   ControllerState& state = controllers[ctrl];
   if (state.isUp()) // don't do the job twice
   {
+    if (isWarnEnabled())
+      logWarn(L"WatchedControllers::setControllerDown", L"Controller " + 
static_cast<wstring>(ctrl)
+           + L" detected as down by driver");
     state.setDown();
-    callback.onControllerDown(ctrl);
+    callback_ptr->onControllerDown(ctrl);
   }
 }
 

_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits

Reply via email to