Date: Thursday, February 2, 2006 @ 18:19:18
Author: gilles
Path: /cvsroot/carob/carob
Modified: include/ControllerConnectPolicy.hpp (1.4 -> 1.5)
src/ControllerConnectPolicy.cpp (1.6 -> 1.7)
Implemented updateSuspectList()
Suspect list is now a vector of struct{controllerInfo/javaSocket*} (was a map
but appeared to be unusefull)
Fixed getController() from round robin policy: we cannot do like in java
because our suspect list is shared !
JavaSocket is now a pointer in the suspect list (so we don't "copy" socket
anymore !)
Added select timeout customization parameter to updateSuspectList()
Added comment on ignored retryIntervalInMs param + set default
-------------------------------------+
include/ControllerConnectPolicy.hpp | 19 ++-
src/ControllerConnectPolicy.cpp | 184 ++++++++++++++++++++++++++--------
2 files changed, 154 insertions(+), 49 deletions(-)
Index: carob/include/ControllerConnectPolicy.hpp
diff -u carob/include/ControllerConnectPolicy.hpp:1.4
carob/include/ControllerConnectPolicy.hpp:1.5
--- carob/include/ControllerConnectPolicy.hpp:1.4 Wed Jan 25 23:13:30 2006
+++ carob/include/ControllerConnectPolicy.hpp Thu Feb 2 18:19:18 2006
@@ -21,6 +21,8 @@
#ifndef CONTROLLERCONNECTPOLICY_H_
#define CONTROLLERCONNECTPOLICY_H_
+#include "ConnectionParameters.hpp" // for ControllerInfo vector
+
#include "CarobException.hpp"
#include "CriticalSection.hpp"
@@ -44,11 +46,11 @@
* Creates a new <code>AbstractControllerConnectPolicy</code> object
*
* @param controllerList the controller list on which the policy applies
- * @param retryIntervalInMs Interval in milliseconds before retrying to
- * re-connect to a controller that has failed
+ * @param retryIntervalInMs IGNORED ! interval in milliseconds before
retrying
+ * to re-connect to a controller that has failed
*/
AbstractControllerConnectPolicy(const std::vector<ControllerInfo>&
controllerList,
- long retryIntervalInMs) throw (DriverException, UnexpectedException);
+ long retryIntervalInMs = 0) throw (DriverException, UnexpectedException);
/**
* Kills the controller ping thread and clear lists
*/
@@ -78,8 +80,10 @@
* Tries to connect to each suspected controller with a non-blocking connect
* and checks the connection state. In case of connection success, send the
* ping command and remove the controller from the suspect list.
+ * @param select_timeout number of milliseconds to wait for the controllers
to
+ * respond to a ping connection attempt (default is 20ms)
*/
- static void updateSuspectList();
+ static void updateSuspectList(int select_timeout = 20);
/**
* Returns true if the specified controller is suspected of failure.
* @param controllerInfo the controller to check
@@ -95,10 +99,11 @@
void suspectControllerOfFailure(
ControllerInfo& controllerInfo);
/**
- * Removes the specified controller from the list of suspect controllers
+ * Removes the specified controller from the list of suspect controllers.
+ * If the given controller is not in the list, logs a warning and returns
* @param controller the controller to remove from the list
*/
- void removeControllerFromSuspectList(
+ static void removeControllerFromSuspectList(
const ControllerInfo& controller);
/**
* Gives the number of controller failures since this process started
@@ -122,7 +127,7 @@
{
public:
RoundRobinConnectPolicy(const std::vector<ControllerInfo>& controllerList,
- long retryIntervalInMs) throw (DriverException, UnexpectedException);
+ long retryIntervalInMs = 0) throw (DriverException, UnexpectedException);
ControllerInfo getController() throw (NoMoreControllerException,
UnexpectedException);
private:
Index: carob/src/ControllerConnectPolicy.cpp
diff -u carob/src/ControllerConnectPolicy.cpp:1.6
carob/src/ControllerConnectPolicy.cpp:1.7
--- carob/src/ControllerConnectPolicy.cpp:1.6 Wed Feb 1 16:54:51 2006
+++ carob/src/ControllerConnectPolicy.cpp Thu Feb 2 18:19:18 2006
@@ -22,11 +22,13 @@
#include "ControllerConnectPolicy.hpp"
#include "JavaSocket.hpp"
+#include "Connection.hpp"
#include "ConnectionParameters.hpp"
#include "Common.hpp"
#include <map>
+#include <sstream>
using std::wstring;
@@ -34,22 +36,20 @@
CriticalSection AbstractControllerConnectPolicy::suspected_controllers_CS;
-// ControllerInfo comparison function
-struct SortByFailureNumber
+typedef struct
{
- bool operator()(const ControllerInfo& ci1, const ControllerInfo& ci2) const
- {
- return true;//(ci1.getFailureNumber() < ci2.getFailureNumber());
- }
-};
-
-typedef std::map<ControllerInfo, JavaSocket, SortByFailureNumber>
SuspectedList;
-// Static list of suspected controllers
+ ControllerInfo controllerInfo;
+ JavaSocket* pingSocketPtr;
+} SuspectController;
+
+/** Utility typedef to have a short name for suspect_list content type */
+typedef std::vector<SuspectController> SuspectList;
+/** The list of suspected controllers (static => per process) */
namespace
{
/** list of controllers suspected of failure */
- SuspectedList suspected_controllers;
- /** # of controllers that have fail since this driver has been created */
+ SuspectList suspected_controllers;
+ /** # of controllers that have fail since this process has been created */
int controller_failure_number = 0;
};
@@ -68,24 +68,98 @@
controller_list.clear();
}
-void AbstractControllerConnectPolicy::updateSuspectList()
+/*static*/
+void AbstractControllerConnectPolicy::updateSuspectList(int select_timeout /*
= 20 */)
{
+ wstring fctName(L"AbstractControllerConnectPolicy::updateSuspectList");
+ if (suspected_controllers.size() < 1)
+ return;
+
LockScope scLs(&suspected_controllers_CS);
- for (SuspectedList::const_iterator iter = suspected_controllers.begin();
+ //create set of fds for select function
+ fd_set writableControllers;
+ FD_ZERO(&writableControllers);
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = select_timeout;
+ int fdMax = -1, retVal = 0;
+ for (SuspectList::iterator iter = suspected_controllers.begin();
iter != suspected_controllers.end(); iter++)
{
- //TODO:
- //connect
- //select
- //foreach writable (in select result) => remove from list
+ try
+ {
+ //connect socket and add it to the list of fds
+ if
((*iter).pingSocketPtr->connectTo((*iter).controllerInfo.getHostName(),
+
(*iter).controllerInfo.getHostPort()))
+ {
+ int sock = (*iter).pingSocketPtr->getFd();
+ if (sock > fdMax)
+ fdMax = sock;
+ FD_SET(sock, &writableControllers);
+ }
+ }
+ catch (...){}
+ }
+ retVal = select(fdMax+1, NULL, &writableControllers, NULL, &tv);
+ if (retVal == -1)
+ {
+ if (isErrorEnabled())
+ {
+ logError(fctName, L"Select returned error #"+toWString(errno));
+ }
+ return;
+ }
+ // if (retVal == 0) => timeout, no controller up again, do nothing
+ // If controllers are found up again, let's remove them from the list of
+ // suspects
+ if (retVal > 0)
+ {
+ // Anti-perfomant-but-very-sure way to remove up-again controllers from the
+ // list of suspects
+ bool mayHaveControllerLeftToRemove = true;
+ while (mayHaveControllerLeftToRemove)
+ {
+ mayHaveControllerLeftToRemove = false;
+ for (SuspectList::iterator iter = suspected_controllers.begin();
+ iter != suspected_controllers.end(); iter++)
+ {
+ if (FD_ISSET((*iter).pingSocketPtr->getFd(), &writableControllers))
+ {
+ if (isDebugEnabled())
+ {
+ logDebug(fctName, L"Controller "
+ + static_cast<wstring>((*iter).controllerInfo)
+ + L" is up again. Removing it from the list of suspects (list
size="
+ + toWString(suspected_controllers.size())+L")");
+ }
+ //send ping and close socket
+ (*iter).pingSocketPtr->writeJavaInt(Ping);
+ // For performance, we could:
+ //1.delete (*iter).pingSocketPtr;
+ //2.suspected_controllers.erase(iter);
+ // But we don't need perf in this part of code, so let's factorize
+ // and use our dedicated function
+ removeControllerFromSuspectList((*iter).controllerInfo);
+ mayHaveControllerLeftToRemove = true;
+ break;
+ }
+ }
+ }
}
}
+/*static*/
bool AbstractControllerConnectPolicy::isSuspectedOfFailure(
const ControllerInfo& controllerInfo)
{
- SuspectedList::const_iterator iter =
suspected_controllers.find(controllerInfo);
- return iter != suspected_controllers.end();
+ // Just compare controller info with each item in the whole list
+ for (SuspectList::const_iterator iter = suspected_controllers.begin();
+ iter != suspected_controllers.end(); iter++)
+ {
+ if ((*iter).controllerInfo == controllerInfo)
+ return true;
+ }
+ return false;
}
void AbstractControllerConnectPolicy::suspectControllerOfFailure(
@@ -97,42 +171,62 @@
if (isSuspectedOfFailure(controllerInfo))
return;
- // Check that the controllerInfo is correct and add it to the list
+ // Add it to the list
for (size_t i = 0; i < controller_list.size(); i++)
{
- ControllerInfo controller = controller_list[i];
- if (controller == controllerInfo)
+ if (controllerInfo == controller_list[i])
{
- // prepare the ping operation to this controller
- JavaSocket newSocket;
- newSocket.create(false); //create non blocking
LockScope scLs(&suspected_controllers_CS);
- // assign a failure number to this controller
+ //just for reporting (in the future ?)
controller_failure_number++;
- //controllerInfo.setFailureNumber(controller_failure_number);
- //add the controller and its prepared socket to the list
- suspected_controllers[controllerInfo] = newSocket;
+ SuspectController newSuspect;
+ newSuspect.controllerInfo = controllerInfo;
+ // creates and prepares a new socket for pinging the controller
+ newSuspect.pingSocketPtr = new JavaSocket();
+ newSuspect.pingSocketPtr->create(false); //create non blocking
+ //add the controller and its socket to the list
+ suspected_controllers.push_back(newSuspect);
if (isDebugEnabled())
- logDebug(fctName, L"Controller " + (wstring)controllerInfo
- + L" is now suspected of failure (#");
+ logDebug(fctName, L"Controller " + static_cast<wstring>(controllerInfo)
+ + L" is now suspected of failure (list size="
+ + toWString(suspected_controllers.size())+L")");
return;
}
}
}
+/*static*/
void AbstractControllerConnectPolicy::removeControllerFromSuspectList(
- const ControllerInfo& controller)
+ const ControllerInfo& controllerInfo)
{
wstring
fctName(L"AbstractControllerConnectPolicy::removeControllerFromSuspectList");
LockScope scLs(&suspected_controllers_CS);
- suspected_controllers.erase(controller);
- if (isDebugEnabled())
+
+ for (SuspectList::iterator iter = suspected_controllers.begin();
+ iter != suspected_controllers.end(); iter++)
+ {
+ if ((*iter).controllerInfo == controllerInfo)
+ {
+ // found it
+ delete (*iter).pingSocketPtr; //free the ping socket
+ suspected_controllers.erase(iter);
+ if (isDebugEnabled())
+ {
+ logDebug(fctName, L"Controller " + (wstring)controllerInfo
+ + L" has been removed from suspect list (list size="
+ + toWString(suspected_controllers.size())+L")");
+ }
+ return;
+ }
+ }
+ if (isWarningEnabled())
{
- logDebug(fctName, L"Controller " + (wstring)controller
- + L" is removed from suspect list");
+ logWarning(fctName, L"Controller " + (wstring)controllerInfo
+ + L" is not (anymore?) in the suspect list");
}
}
+/*static*/
int AbstractControllerConnectPolicy::getNumberOfFailures()
{
return controller_failure_number;
@@ -158,15 +252,21 @@
LockScope ls(&policy_CS);
{
LockScope scLs(&suspected_controllers_CS);
- if (suspected_controllers.size() == controller_list.size())
- throw NoMoreControllerException(L"All "
- + toWString(suspected_controllers.size())
- + L" controllers down");
+ unsigned int testedControllers = 0;
do
{
index = (index + 1) % controller_list.size();
+ testedControllers++;
+ }
+ while (isSuspectedOfFailure(controller_list[index])
+ && testedControllers <= controller_list.size());
+ // if there are on more controllers up => exception
+ if (testedControllers > controller_list.size())
+ {
+ throw NoMoreControllerException(L"All "
+ + toWString(suspected_controllers.size())
+ + L" controllers down");
}
- while (isSuspectedOfFailure(controller_list[index]));
}
if (isDebugEnabled())
logDebug(fctName, L"Selected controller[" + toWString(index) + L"]:"
_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits