Date: Thursday, November 30, 2006 @ 21:10:28
Author: gilles
Path: /cvsroot/carob/carob
Modified: include/ControllerWatcher.hpp (1.1 -> 1.2)
include/WatchedControllers.hpp (1.1 -> 1.2)
src/ControllerPool.cpp (1.10 -> 1.11) src/ControllerWatcher.cpp
(1.4 -> 1.5) src/WatchedControllers.cpp (1.1 -> 1.2)
Fix slow close of connections: detached watcher thread and gave him
responsability to destroy all its objects
--------------------------------+
include/ControllerWatcher.hpp | 20 ++++++++++++++--
include/WatchedControllers.hpp | 6 ++--
src/ControllerPool.cpp | 14 ++++-------
src/ControllerWatcher.cpp | 48 +++++++++++++++++++++------------------
src/WatchedControllers.cpp | 13 ++++++----
5 files changed, 61 insertions(+), 40 deletions(-)
Index: carob/include/ControllerWatcher.hpp
diff -u carob/include/ControllerWatcher.hpp:1.1
carob/include/ControllerWatcher.hpp:1.2
--- carob/include/ControllerWatcher.hpp:1.1 Tue Nov 28 17:07:55 2006
+++ carob/include/ControllerWatcher.hpp Thu Nov 30 21:10:28 2006
@@ -26,6 +26,10 @@
#include <sys/types.h> // for pthread_t
#include <vector>
+/**
+ * Function to be passed to pthread_create. Launches run on given
+ * ControllerWatcher instance
+ */
extern "C" void *ControllerWatcherThread(void *);
namespace CarobNS {
@@ -43,20 +47,22 @@
*/
class ControllerWatcher
{
+ // This function destroy some of our members
+ friend void* ::ControllerWatcherThread(void *);
public:
/**
* Creates a controller watcher with the given controllers to watch.<br>
* Pings will be sent to all given controllers
*
* @param controllerList controllers to ping
- * @param callback Callback implementation to call when a controller state
+ * @param callbackPtr Callback implementation to call when a controller state
* changes
* @param pingDelayInMs time to wait between two successive pings
* @param controllerTimeout delay after which a controller will be considered
* as failing if it did not respond to pings
*/
ControllerWatcher(const std::vector<ControllerInfo>& controllerList,
- ControllerStateChangedCallback& callback, int pingDelayInMs,
+ ControllerStateChangedCallback* callbackPtr, int pingDelayInMs,
int controllerTimeout) throw (ConnectionException, UnexpectedException);
~ControllerWatcher();
@@ -78,6 +84,16 @@
*/
void forceControllerDown(const ControllerInfo& c);
+protected:
+ // These accessors are only meant to be used by the thread C function
+ /** Gets the watched controllers to delete */
+ WatchedControllers* getControllersPtr() { return controllers_ptr; }
+ /** Gets the pinger thread to stop him */
+ pthread_t getPingerThread() { return pinger_thread; }
+ /** Gets the ControllerPingSender to delete */
+ ControllerPingSender* getControllerPingSenderPtr() { return pinger_ptr; }
+ /** Gets the socket file descriptor to close */
+ int getSocketFd() { return socket_fd; }
private:
/**
* Reads data from socket and updates lastTimeSeen values accordingly
Index: carob/include/WatchedControllers.hpp
diff -u carob/include/WatchedControllers.hpp:1.1
carob/include/WatchedControllers.hpp:1.2
--- carob/include/WatchedControllers.hpp:1.1 Tue Nov 28 17:07:55 2006
+++ carob/include/WatchedControllers.hpp Thu Nov 30 21:10:28 2006
@@ -80,11 +80,11 @@
* @param initTime initial lastTimeSeen value, typically <code>clock()</code>
* @param controllerTimeout delay after which a controller will be considered
* as failing if it did not respond to pings
- * @param callback Callback implementation to call when a controller state
+ * @param cbPtr Callback implementation to call when a controller state
* changes
*/
WatchedControllers(const std::vector<ControllerInfo>& controllersPrm, long
initTime,
- int controllerTimeout, ControllerStateChangedCallback& callback);
+ int controllerTimeout, ControllerStateChangedCallback* cbPtr);
/**
* Adds a controller to the list and associates the given lastTimeSeenValue
to
@@ -155,7 +155,7 @@
*/
int controller_timeout;
/** Actions to take when a controller goes down or back-alive */
- ControllerStateChangedCallback& callback;
+ ControllerStateChangedCallback* callback_ptr;
};
} //namespace CarobNS
#endif /* WATCHED_CONTROLLERS_H_ */
Index: carob/src/ControllerPool.cpp
diff -u carob/src/ControllerPool.cpp:1.10 carob/src/ControllerPool.cpp:1.11
--- carob/src/ControllerPool.cpp:1.10 Thu Nov 30 17:12:23 2006
+++ carob/src/ControllerPool.cpp Thu Nov 30 21:10:28 2006
@@ -64,9 +64,13 @@
callback_ptr = new SocketKillerCallback(*this);
try
{
- watcher_ptr = new ControllerWatcher(controllerList, *callback_ptr,
+ // Since the destruction of the watcher can be long (waiting for select
+ // timeout), the property of this watcher instance + the callback instance
+ // is transfered to the ControllerWatcherThread C function
+ watcher_ptr = new ControllerWatcher(controllerList, callback_ptr,
pingDelayInMs, controllerTimeoutInMs);
pthread_create(&watcher_thread, NULL, ControllerWatcherThread,
watcher_ptr);
+ pthread_detach(watcher_thread);
}
catch (ConnectionException cause)
{
@@ -78,14 +82,6 @@
AbstractControllerPool::~AbstractControllerPool()
{
watcher_ptr->stop();
- int err = pthread_join(watcher_thread, NULL);
- if (err && isErrorEnabled())
- {
- logError(L"AbstractControllerPool::~AbstractControllerPool",
- L"Failed to join watcher thread");
- }
- delete(watcher_ptr);
- delete(callback_ptr);
alive_controllers.clear();
}
Index: carob/src/ControllerWatcher.cpp
diff -u carob/src/ControllerWatcher.cpp:1.4 carob/src/ControllerWatcher.cpp:1.5
--- carob/src/ControllerWatcher.cpp:1.4 Thu Nov 30 17:12:23 2006
+++ carob/src/ControllerWatcher.cpp Thu Nov 30 21:10:28 2006
@@ -42,8 +42,32 @@
using std::vector;
using std::wstring;
+// extern C thread function that launches the ping
+void* ControllerWatcherThread(void * thisPtr)
+{
+ ControllerWatcher* pt = (ControllerWatcher*)thisPtr;
+ pt->run();
+
+ // Function has finished, it means that we were stopped, so we have to clean
+ // everything:
+ // stop the pinger thread
+ pt->getControllerPingSenderPtr()->stop();
+ // wait for the pinger to finish
+ pthread_join(pt->getPingerThread(), NULL);
+ // pinger has finished, we can close socket...
+ close(pt->getSocketFd());
+ // ...and delete everything
+ delete(pt->getControllerPingSenderPtr());
+ delete(pt->getControllersPtr());
+
+ // delete 'this'
+ delete pt;
+ pthread_exit(NULL);
+ return NULL;
+}
+
ControllerWatcher::ControllerWatcher(const vector<ControllerInfo>&
controllerList,
- ControllerStateChangedCallback& callback, int pingDelayInMs, int
controllerTimeout)
+ ControllerStateChangedCallback* callbackPtr, int pingDelayInMs, int
controllerTimeout)
throw (ConnectionException, UnexpectedException) :
ping_delay_in_ms(pingDelayInMs), controller_timeout(controllerTimeout),
is_stopped(false)
@@ -52,7 +76,7 @@
wstring fctName(L"ControllerWatcher::ControllerWatcher");
controllers_ptr = new WatchedControllers(controllerList,
getCurrentTimeInMs(),
- controllerTimeout, callback);
+ controllerTimeout, callbackPtr);
// Get the ip family of the first controller, this will be the default one
socket_fd = socket(controllerList.front().getFamily(), SOCK_DGRAM, 0);
@@ -90,25 +114,7 @@
ControllerWatcher::~ControllerWatcher()
{
- // stop the pinger thread
- pinger_ptr->stop();
- int err = pthread_join(pinger_thread, NULL);
- if (err && isErrorEnabled())
- {
- logError(L"ControllerWatcher::~ControllerWatcher",
- L"Failed to join pinger thread");
- }
- close(socket_fd);
- delete(pinger_ptr);
- delete(controllers_ptr);
-}
-
-void* ControllerWatcherThread(void * thisPtr)
-{
- ControllerWatcher* pt = (ControllerWatcher*)thisPtr;
- pt->run();
- pthread_exit(NULL);
- return NULL;
+ // Clean-up is done in the C function
}
void ControllerWatcher::processAnswers()
Index: carob/src/WatchedControllers.cpp
diff -u carob/src/WatchedControllers.cpp:1.1
carob/src/WatchedControllers.cpp:1.2
--- carob/src/WatchedControllers.cpp:1.1 Tue Nov 28 17:07:55 2006
+++ carob/src/WatchedControllers.cpp Thu Nov 30 21:10:28 2006
@@ -57,9 +57,9 @@
CriticalSection WatchedControllers::controllers_CS;
WatchedControllers::WatchedControllers(const vector<ControllerInfo>&
controllersPrm,
- long initTime, int controllerTimeout, ControllerStateChangedCallback& cb) :
+ long initTime, int controllerTimeout, ControllerStateChangedCallback*
cbPtr) :
controller_timeout(controllerTimeout),
- callback(cb)
+ callback_ptr(cbPtr)
{
// we are inside a contructor, so we can safely operate on the
// list without worrying of concurrent modifications
@@ -95,7 +95,7 @@
logInfo(fctName, L"Controller " + static_cast<wstring>(ctrl)
+ L" responds to pings again. Adding it to the pool.");
state.setUp();
- callback.onControllerUp(ctrl);
+ callback_ptr->onControllerUp(ctrl);
}
}
@@ -113,7 +113,7 @@
logWarn(fctName, L"Controller " + static_cast<wstring>(iter->first)
+ L" does not respond to pings. Considering it as down.");
iter->second.setDown();
- callback.onControllerDown(iter->first);
+ callback_ptr->onControllerDown(iter->first);
}
}
}
@@ -124,8 +124,11 @@
ControllerState& state = controllers[ctrl];
if (state.isUp()) // don't do the job twice
{
+ if (isWarnEnabled())
+ logWarn(L"WatchedControllers::setControllerDown", L"Controller " +
static_cast<wstring>(ctrl)
+ + L" detected as down by driver");
state.setDown();
- callback.onControllerDown(ctrl);
+ callback_ptr->onControllerDown(ctrl);
}
}
_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits