Date: Thursday, February 2, 2006 @ 18:19:18
  Author: gilles
    Path: /cvsroot/carob/carob

Modified: include/ControllerConnectPolicy.hpp (1.4 -> 1.5)
          src/ControllerConnectPolicy.cpp (1.6 -> 1.7)

Implemented updateSuspectList()
Suspect list is now a vector of struct{controllerInfo/javaSocket*} (was a map 
but appeared to be unusefull)
Fixed getController() from round robin policy: we cannot do like in java 
because our suspect list is shared !
JavaSocket is now a pointer in the suspect list (so we don't "copy" socket 
anymore !)
Added select timeout customization parameter to updateSuspectList()
Added comment on ignored retryIntervalInMs param + set default


-------------------------------------+
 include/ControllerConnectPolicy.hpp |   19 ++-
 src/ControllerConnectPolicy.cpp     |  184 ++++++++++++++++++++++++++--------
 2 files changed, 154 insertions(+), 49 deletions(-)


Index: carob/include/ControllerConnectPolicy.hpp
diff -u carob/include/ControllerConnectPolicy.hpp:1.4 
carob/include/ControllerConnectPolicy.hpp:1.5
--- carob/include/ControllerConnectPolicy.hpp:1.4       Wed Jan 25 23:13:30 2006
+++ carob/include/ControllerConnectPolicy.hpp   Thu Feb  2 18:19:18 2006
@@ -21,6 +21,8 @@
 #ifndef CONTROLLERCONNECTPOLICY_H_
 #define CONTROLLERCONNECTPOLICY_H_
 
+#include "ConnectionParameters.hpp" // for ControllerInfo vector
+
 #include "CarobException.hpp"
 #include "CriticalSection.hpp"
 
@@ -44,11 +46,11 @@
    * Creates a new <code>AbstractControllerConnectPolicy</code> object
    * 
    * @param controllerList the controller list on which the policy applies
-   * @param retryIntervalInMs Interval in milliseconds before retrying to
-   *          re-connect to a controller that has failed
+   * @param retryIntervalInMs IGNORED ! interval in milliseconds before 
retrying
+   *          to re-connect to a controller that has failed
    */
   AbstractControllerConnectPolicy(const std::vector<ControllerInfo>& 
controllerList,
-      long retryIntervalInMs) throw (DriverException, UnexpectedException);
+      long retryIntervalInMs = 0) throw (DriverException, UnexpectedException);
   /**
    * Kills the controller ping thread and clear lists
    */
@@ -78,8 +80,10 @@
    * Tries to connect to each suspected controller with a non-blocking connect
    * and checks the connection state. In case of connection success, send the
    * ping command and remove the controller from the suspect list.
+   * @param select_timeout number of milliseconds to wait for the controllers 
to
+   *        respond to a ping connection attempt (default is 20ms)
    */
-  static void                 updateSuspectList();
+  static void                 updateSuspectList(int select_timeout = 20);
   /**
    * Returns true if the specified controller is suspected of failure.
    * @param controllerInfo the controller to check
@@ -95,10 +99,11 @@
   void                        suspectControllerOfFailure(
                                   ControllerInfo& controllerInfo);
   /**
-   * Removes the specified controller from the list of suspect controllers
+   * Removes the specified controller from the list of suspect controllers.
+   * If the given controller is not in the list, logs a warning and returns
    * @param controller the controller to remove from the list
    */
-  void                        removeControllerFromSuspectList(
+  static void                 removeControllerFromSuspectList(
                                   const ControllerInfo& controller);
   /**
    * Gives the number of controller failures since this process started
@@ -122,7 +127,7 @@
 {
 public:
   RoundRobinConnectPolicy(const std::vector<ControllerInfo>& controllerList,
-      long retryIntervalInMs) throw (DriverException, UnexpectedException);
+    long retryIntervalInMs = 0) throw (DriverException, UnexpectedException);
   ControllerInfo              getController() throw (NoMoreControllerException,
                                   UnexpectedException);
 private:
Index: carob/src/ControllerConnectPolicy.cpp
diff -u carob/src/ControllerConnectPolicy.cpp:1.6 
carob/src/ControllerConnectPolicy.cpp:1.7
--- carob/src/ControllerConnectPolicy.cpp:1.6   Wed Feb  1 16:54:51 2006
+++ carob/src/ControllerConnectPolicy.cpp       Thu Feb  2 18:19:18 2006
@@ -22,11 +22,13 @@
 #include "ControllerConnectPolicy.hpp"
 
 #include "JavaSocket.hpp"
+#include "Connection.hpp"
 
 #include "ConnectionParameters.hpp"
 #include "Common.hpp"
 
 #include <map>
+#include <sstream>
 
 using std::wstring;
 
@@ -34,22 +36,20 @@
 
 CriticalSection AbstractControllerConnectPolicy::suspected_controllers_CS;
 
-// ControllerInfo comparison function
-struct SortByFailureNumber
+typedef struct
 {
-  bool operator()(const ControllerInfo& ci1, const ControllerInfo& ci2) const
-  {
-    return true;//(ci1.getFailureNumber() < ci2.getFailureNumber());
-  }
-};
-
-typedef std::map<ControllerInfo, JavaSocket, SortByFailureNumber> 
SuspectedList;
-// Static list of suspected controllers
+  ControllerInfo  controllerInfo;
+  JavaSocket*     pingSocketPtr;
+} SuspectController;
+
+/** Utility typedef to have a short name for suspect_list content type */
+typedef std::vector<SuspectController> SuspectList;
+/** The list of suspected controllers (static => per process) */
 namespace
 {
   /** list of controllers suspected of failure */
-  SuspectedList suspected_controllers;
-  /** # of controllers that have fail since this driver has been created */
+  SuspectList suspected_controllers;
+  /** # of controllers that have fail since this process has been created */
   int controller_failure_number = 0;
 };
 
@@ -68,24 +68,98 @@
   controller_list.clear();
 }
 
-void AbstractControllerConnectPolicy::updateSuspectList()
+/*static*/
+void AbstractControllerConnectPolicy::updateSuspectList(int select_timeout /* 
= 20 */)
 {
+  wstring fctName(L"AbstractControllerConnectPolicy::updateSuspectList");
+  if (suspected_controllers.size() < 1)
+    return;
+
   LockScope scLs(&suspected_controllers_CS);
-  for (SuspectedList::const_iterator iter = suspected_controllers.begin();
+  //create set of fds for select function
+  fd_set writableControllers;
+  FD_ZERO(&writableControllers);
+  struct timeval tv;
+  tv.tv_sec = 0;
+  tv.tv_usec = select_timeout;
+  int fdMax = -1, retVal = 0;
+  for (SuspectList::iterator iter = suspected_controllers.begin();
       iter != suspected_controllers.end(); iter++)
   {
-    //TODO:
-    //connect
-    //select
-    //foreach writable (in select result) => remove from list
+    try
+    {
+      //connect socket and add it to the list of fds
+      if 
((*iter).pingSocketPtr->connectTo((*iter).controllerInfo.getHostName(),
+                                           
(*iter).controllerInfo.getHostPort()))
+      {
+        int sock = (*iter).pingSocketPtr->getFd();
+        if (sock > fdMax)
+          fdMax = sock;
+        FD_SET(sock, &writableControllers);
+      }
+    }
+    catch (...){}
+  }
+  retVal = select(fdMax+1, NULL, &writableControllers, NULL, &tv);
+  if (retVal == -1)
+  {
+    if (isErrorEnabled())
+    {
+      logError(fctName, L"Select returned error #"+toWString(errno));
+    }
+    return;
+  }
+  // if (retVal == 0) => timeout, no controller up again, do nothing
+  // If controllers are found up again, let's remove them from the list of
+  // suspects
+  if (retVal > 0)
+  {
+    // Anti-perfomant-but-very-sure way to remove up-again controllers from the
+    // list of suspects
+    bool mayHaveControllerLeftToRemove = true;
+    while (mayHaveControllerLeftToRemove)
+    {
+      mayHaveControllerLeftToRemove = false;
+      for (SuspectList::iterator iter = suspected_controllers.begin();
+          iter != suspected_controllers.end(); iter++)
+      {
+        if (FD_ISSET((*iter).pingSocketPtr->getFd(), &writableControllers))
+        {
+          if (isDebugEnabled())
+          {
+            logDebug(fctName, L"Controller "
+                + static_cast<wstring>((*iter).controllerInfo)
+                + L" is up again. Removing it from the list of suspects (list 
size="
+                + toWString(suspected_controllers.size())+L")");
+          }
+          //send ping and close socket
+          (*iter).pingSocketPtr->writeJavaInt(Ping);
+          // For performance, we could:
+          //1.delete (*iter).pingSocketPtr;
+          //2.suspected_controllers.erase(iter);
+          // But we don't need perf in this part of code, so let's factorize
+          // and use our dedicated function
+          removeControllerFromSuspectList((*iter).controllerInfo);
+          mayHaveControllerLeftToRemove = true;
+          break;
+        }
+      }
+    }
   }
 }
 
+/*static*/
 bool AbstractControllerConnectPolicy::isSuspectedOfFailure(
     const ControllerInfo& controllerInfo)
 {
-  SuspectedList::const_iterator iter = 
suspected_controllers.find(controllerInfo);
-  return iter != suspected_controllers.end();
+  // Just compare controller info with each item in the whole list
+  for (SuspectList::const_iterator iter = suspected_controllers.begin();
+      iter != suspected_controllers.end(); iter++)
+  {
+    if ((*iter).controllerInfo == controllerInfo)
+      return true;
+  }
+  return false;
 }
 
 void AbstractControllerConnectPolicy::suspectControllerOfFailure(
@@ -97,42 +171,62 @@
   if (isSuspectedOfFailure(controllerInfo))
     return;
 
-  // Check that the controllerInfo is correct and add it to the list
+  // Add it to the list
   for (size_t i = 0; i < controller_list.size(); i++)
   {
-    ControllerInfo controller = controller_list[i];
-    if (controller == controllerInfo)
+    if (controllerInfo == controller_list[i])
     {
-      // prepare the ping operation to this controller
-      JavaSocket newSocket;
-      newSocket.create(false); //create non blocking
       LockScope scLs(&suspected_controllers_CS);
-      // assign a failure number to this controller
+      //just for reporting (in the future ?)
       controller_failure_number++;
-      //controllerInfo.setFailureNumber(controller_failure_number);
-      //add the controller and its prepared socket to the list
-      suspected_controllers[controllerInfo] = newSocket;
+      SuspectController newSuspect;
+      newSuspect.controllerInfo = controllerInfo;
+      // creates and prepares a new socket for pinging the controller
+      newSuspect.pingSocketPtr = new JavaSocket();
+      newSuspect.pingSocketPtr->create(false); //create non blocking
+      //add the controller and its socket to the list
+      suspected_controllers.push_back(newSuspect);
       if (isDebugEnabled())
-        logDebug(fctName, L"Controller " + (wstring)controllerInfo
-            + L" is now suspected of failure (#");
+        logDebug(fctName, L"Controller " + static_cast<wstring>(controllerInfo)
+            + L" is now suspected of failure (list size="
+            + toWString(suspected_controllers.size())+L")");
       return;
     }
   }
 }
 
+/*static*/
 void AbstractControllerConnectPolicy::removeControllerFromSuspectList(
-    const ControllerInfo& controller)
+    const ControllerInfo& controllerInfo)
 {
   wstring 
fctName(L"AbstractControllerConnectPolicy::removeControllerFromSuspectList");
   LockScope scLs(&suspected_controllers_CS);
-  suspected_controllers.erase(controller);
-  if (isDebugEnabled())
+
+  for (SuspectList::iterator iter = suspected_controllers.begin();
+      iter != suspected_controllers.end(); iter++)
+  {
+    if ((*iter).controllerInfo == controllerInfo)
+    {
+      // found it
+      delete (*iter).pingSocketPtr; //free the ping socket
+      suspected_controllers.erase(iter);
+      if (isDebugEnabled())
+      {
+        logDebug(fctName, L"Controller " + (wstring)controllerInfo
+            + L" has been removed from suspect list (list size="
+            + toWString(suspected_controllers.size())+L")");
+      }
+      return;
+    }
+  }
+  if (isWarningEnabled())
   {
-    logDebug(fctName, L"Controller " + (wstring)controller
-        + L" is removed from suspect list");
+    logWarning(fctName, L"Controller " + (wstring)controllerInfo
+        + L" is not (anymore?) in the suspect list");
   }
 }
 
+/*static*/
 int AbstractControllerConnectPolicy::getNumberOfFailures()
 {
   return controller_failure_number;
@@ -158,15 +252,21 @@
   LockScope ls(&policy_CS);
   {
     LockScope scLs(&suspected_controllers_CS);
-    if (suspected_controllers.size() == controller_list.size())
-      throw NoMoreControllerException(L"All "
-                                      + toWString(suspected_controllers.size())
-                                      + L" controllers down");
+    unsigned int testedControllers = 0;
     do
     {
       index = (index + 1) % controller_list.size();
+      testedControllers++;
+    }
+    while (isSuspectedOfFailure(controller_list[index])
+        && testedControllers <= controller_list.size());
+    // if there are on more controllers up => exception
+    if (testedControllers > controller_list.size())
+    {
+      throw NoMoreControllerException(L"All "
+                                      + toWString(suspected_controllers.size())
+                                      + L" controllers down");
     }
-    while (isSuspectedOfFailure(controller_list[index]));
   }
   if (isDebugEnabled())
     logDebug(fctName, L"Selected controller[" + toWString(index) + L"]:"

_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits

Reply via email to