Date: Monday, December 4, 2006 @ 15:13:48
  Author: gilles
    Path: /cvsroot/carob/carob

Modified: include/ControllerPool.hpp (1.7 -> 1.8)
          include/ControllerStateChangedCallback.hpp (1.2 -> 1.3)
          include/JavaSocket.hpp (1.35 -> 1.36) src/ControllerInfo.cpp
          (1.4 -> 1.5) src/ControllerPool.cpp (1.11 -> 1.12)
          src/ControllerStateChangedCallback.cpp (1.3 -> 1.4)
          src/ControllerWatcher.cpp (1.5 -> 1.6) src/JavaSocket.cpp (1.58
          -> 1.59)

Removing possibly non-thread safe shutdown(socketfd) call, replacing by 
non-blockant-like reads using poll:
. Callback now takes JavaSockets (instead of fds) and calls shutdown() on these 
objects
. JavaSocket reads are now done after a poll() (or select), so they can be 
interrupted by calling shutdown(); they will then throw a SocketIOException

Centralized poll call in a static function of JavaSocket class

Fixed bug in ltPoolIndex, was not comparing same length controller lists


--------------------------------------------+
 include/ControllerPool.hpp                 |    7 +-
 include/ControllerStateChangedCallback.hpp |    5 +
 include/JavaSocket.hpp                     |   26 ++++++++--
 src/ControllerInfo.cpp                     |    5 +
 src/ControllerPool.cpp                     |   11 ++--
 src/ControllerStateChangedCallback.cpp     |   16 +++---
 src/ControllerWatcher.cpp                  |   39 +--------------
 src/JavaSocket.cpp                         |   69 ++++++++++++++++++++++-----
 8 files changed, 109 insertions(+), 69 deletions(-)


Index: carob/include/ControllerPool.hpp
diff -u carob/include/ControllerPool.hpp:1.7 
carob/include/ControllerPool.hpp:1.8
--- carob/include/ControllerPool.hpp:1.7        Tue Nov 28 17:07:55 2006
+++ carob/include/ControllerPool.hpp    Mon Dec  4 15:13:48 2006
@@ -53,7 +53,7 @@
 
 class ControllerWatcher;
 class SocketKillerCallback;
-
+class JavaSocket;
 /**
  * Abstract class over each policy implementation, which is used by the driver
  * to choose a controller to connect to.
@@ -94,7 +94,8 @@
    * @param controller the controller to which the socket is connected
    * @param socketFd the socket file descriptor to register
    */
-  void                        registerSocket(const ControllerInfo& controller, 
int socketFd);
+  void                        registerSocket(const ControllerInfo& controller,
+                                  JavaSocket* socket);
 
   /**
    * Tell the watcher that a failure has been detected on the given 
controller.<br>
@@ -243,7 +244,7 @@
    * distribution
    * @throws DriverException if the given policy is invalid 
    */
-  static AbstractControllerPool&  getPool(std::vector<ControllerInfo> ctrls,
+  static AbstractControllerPool&  getPool(std::vector<ControllerInfo>& ctrls,
                                      ConnectPolicy cp, int pingDelayInMs,
                                      int controllerTimeoutInMs)
                                      throw (DriverException, 
UnexpectedException);
Index: carob/include/ControllerStateChangedCallback.hpp
diff -u carob/include/ControllerStateChangedCallback.hpp:1.2 
carob/include/ControllerStateChangedCallback.hpp:1.3
--- carob/include/ControllerStateChangedCallback.hpp:1.2        Wed Nov 29 
09:45:49 2006
+++ carob/include/ControllerStateChangedCallback.hpp    Mon Dec  4 15:13:48 2006
@@ -31,6 +31,7 @@
 namespace CarobNS {
 
 class AbstractControllerPool;
+class JavaSocket;
 
 /**
  * Provides the two methods that will be called when a controller is detected 
as
@@ -81,11 +82,11 @@
   /** Just informs policy that a controller came back */
   void onControllerUp(const ControllerInfo& ctrl);
   /** Adds the given socket to the list of monitored sockets for given 
controller */
-  void registerSocket(const ControllerInfo& ctrl, int socketFd);
+  void registerSocket(const ControllerInfo& ctrl, JavaSocket* socketFd);
 
 private:
   /** The list of controllers with their set of sockets */
-  std::map<ControllerInfo, std::vector<int> >  controllers_and_sockets;
+  std::map<ControllerInfo, std::vector<JavaSocket*> >  controllers_and_sockets;
   /** Policy that created us, to tell about controller state changes */
   AbstractControllerPool&  policy;
   /** controllers&sockets synchronization object */
Index: carob/include/JavaSocket.hpp
diff -u carob/include/JavaSocket.hpp:1.35 carob/include/JavaSocket.hpp:1.36
--- carob/include/JavaSocket.hpp:1.35   Fri Dec  1 14:56:24 2006
+++ carob/include/JavaSocket.hpp        Mon Dec  4 15:13:48 2006
@@ -169,20 +169,36 @@
   void          writeJavaBytes(int32_t length, java_byte* data) const
                     throw (SocketIOException, UnexpectedException);
   /**
-   * Gets the socket file descriptor
-   * @return the file descriptor
+   * Interrupts read operations on this socket, mark it as disconnected and
+   * makes read operations throw a SocketIOException
+   * @see #recvFully(void *buf, const int len, const int flags) const
    */
-  int           getFd() const { return socket_fd; }
+  void          shutdown() { connected = false; }
+  /**
+   * Execute poll (or select depending on compilation flag CAROB_USE_SELECT) on
+   * the given file descriptor and return after either data is ready to be read
+   * or given timeout is elapsed
+   * @param socketFd socket file descriptor on which to wait for incomming data
+   * @param pollTimeoutInMs timeout after which to return if no data is ready 
to
+   *                        be read on socket
+   * @return >0 if data is ready to be read, 0 if timeout reached, -1 if an
+   *         error occured
+   */
+  static int    pollOnSingleFd(int socketFd, int pollTimeoutInMs);
 protected:
   /**
-   * Substitute for recv. Loops/blocks until full length has been received.
+   * Substitute for recv. Waits for incomming data by calling pollOnSingleFd
+   * and loops until full length has been received or an erro occured. If
+   * shutdown() is called during the loop, throws a SocketIOException to inform
+   * callers that the socket is not longer readable
    * @param buf data to send
    * @param len full buffer length
    * @param flags send options, see recv man page
    * @return the total number of bytes send, -1 in case of failure
+   * @throw SocketIOException if interrupted by shutdown() function
    */
   int32_t       recvFully(void *buf, const int len, const int flags)
-                    const;
+                    const throw (SocketIOException, UnexpectedException);
   /**
    * Wrapper over send(...) function to handle errors and throw exceptions
    * @param fctName name of the calling function (for logging purposes)
Index: carob/src/ControllerInfo.cpp
diff -u carob/src/ControllerInfo.cpp:1.4 carob/src/ControllerInfo.cpp:1.5
--- carob/src/ControllerInfo.cpp:1.4    Fri Dec  1 15:07:42 2006
+++ carob/src/ControllerInfo.cpp        Mon Dec  4 15:13:48 2006
@@ -181,7 +181,7 @@
   int error = getnameinfo(&UDP_addr, UDP_addr_len, hostname, sizeof(hostname), 
NULL, 0, NI_NUMERICHOST);
   if (error != 0)
   {
-    wstring wMsg(L"Error while executing getnameinfo() saf=" + 
toWString(UDP_addr.sa_family)  + L" inet="+toWString(PF_INET) + 
L"inet6="+toWString(PF_INET6) + L" afinet="+toWString(AF_INET) + 
L"afinet6="+toWString(AF_INET6));
+    wstring wMsg(L"Error while executing getnameinfo()");
 #ifndef __MINGW32__
     const char* gaiMsg = gai_strerror(error);
     wMsg += L". Error was: " + fromString(gaiMsg);
@@ -212,7 +212,7 @@
   }
   socketPtr->create(family);
   // register the socket to our policy so it can be killed during connection
-  pool.registerSocket(*this, socketPtr->getFd());
+  pool.registerSocket(*this, socketPtr);
   
   if (isDebugEnabled())
     logDebug(fctName, L"Connecting...");
@@ -244,6 +244,7 @@
     addrIter = addrIter->ai_next;
   }
   // could not connect to any of the addresses
+  freeaddrinfo(addr);
   throw ConnectionException(L"Unable to connect. Last error code was "
       + toUserString(errno));
 }
Index: carob/src/ControllerPool.cpp
diff -u carob/src/ControllerPool.cpp:1.11 carob/src/ControllerPool.cpp:1.12
--- carob/src/ControllerPool.cpp:1.11   Thu Nov 30 21:10:28 2006
+++ carob/src/ControllerPool.cpp        Mon Dec  4 15:13:48 2006
@@ -74,6 +74,8 @@
   }
   catch (ConnectionException cause)
   {
+    alive_controllers.clear();
+    delete callback_ptr;
     throw DriverException(
         L"Controller watcher creation failed!");
   }
@@ -85,9 +87,10 @@
   alive_controllers.clear();
 }
 
-void AbstractControllerPool::registerSocket(const ControllerInfo& controller, 
int socketFd)
+void AbstractControllerPool::registerSocket(const ControllerInfo& controller,
+    JavaSocket* socket)
 {
-  callback_ptr->registerSocket(controller, socketFd);
+  callback_ptr->registerSocket(controller, socket);
 }
 
 void AbstractControllerPool::forceControllerDown(const ControllerInfo& 
controller)
@@ -174,7 +177,7 @@
   // same policy, compare number of controllers
   if (pi1.controllers.size() < pi2.controllers.size())
     return true;
-  else if (pi1.controllers.size() < pi2.controllers.size())
+  else if (pi1.controllers.size() > pi2.controllers.size())
     return false;
   // same size, compare controllers themselves
   vector<ControllerInfo>::const_iterator iter_pi1 = pi1.controllers.begin();
@@ -198,7 +201,7 @@
 std::map<PoolIndex, AbstractControllerPool*, ltPoolIndex>  
ControllerPoolManager::pool_map;
 
 /*static*/
-AbstractControllerPool& ControllerPoolManager::getPool(vector<ControllerInfo> 
ctrls,
+AbstractControllerPool& ControllerPoolManager::getPool(vector<ControllerInfo>& 
ctrls,
     ConnectPolicy cp, int pingDelayInMs, int controllerTimeoutInMs)
     throw (DriverException, UnexpectedException)
 {
Index: carob/src/ControllerStateChangedCallback.cpp
diff -u carob/src/ControllerStateChangedCallback.cpp:1.3 
carob/src/ControllerStateChangedCallback.cpp:1.4
--- carob/src/ControllerStateChangedCallback.cpp:1.3    Fri Dec  1 15:47:31 2006
+++ carob/src/ControllerStateChangedCallback.cpp        Mon Dec  4 15:13:48 2006
@@ -23,6 +23,7 @@
 #include "ControllerStateChangedCallback.hpp"
 
 #include "ControllerInfo.hpp"
+#include "JavaSocket.hpp"
 #include "ControllerPool.hpp"
 #include "Common.hpp"
 
@@ -40,7 +41,7 @@
 
 SocketKillerCallback::~SocketKillerCallback()
 {
-  for (map<ControllerInfo, vector<int> >::iterator iter = 
controllers_and_sockets.begin();
+  for (map<ControllerInfo, vector<JavaSocket*> >::iterator iter = 
controllers_and_sockets.begin();
       iter != controllers_and_sockets.end(); iter++)
   {
     iter->second.clear();
@@ -48,11 +49,11 @@
   controllers_and_sockets.clear();  
 }
 
-void SocketKillerCallback::registerSocket(const ControllerInfo& ctrl, int s)
+void SocketKillerCallback::registerSocket(const ControllerInfo& ctrl, 
JavaSocket* s)
 {
   LockScope ls(&controllers_and_sockets_CS);
-  vector<int> sockets;
-  map<ControllerInfo, vector<int> >::iterator iter = 
controllers_and_sockets.find(ctrl);
+  vector<JavaSocket*> sockets;
+  map<ControllerInfo, vector<JavaSocket*> >::iterator iter = 
controllers_and_sockets.find(ctrl);
   if (iter != controllers_and_sockets.end())
   {
     sockets = iter->second;
@@ -67,17 +68,18 @@
   // tell policy asap
   policy.controllerDown(ctrl);
   LockScope ls(&controllers_and_sockets_CS);
-  map<ControllerInfo, vector<int> >::iterator iter = 
controllers_and_sockets.find(ctrl);
+  map<ControllerInfo, vector<JavaSocket*> >::iterator iter = 
controllers_and_sockets.find(ctrl);
   if (iter != controllers_and_sockets.end())
   {
     if (isDebugEnabled())
       logDebug(fctName, L"Shutting down sockets connected to controller "
           + static_cast<wstring>(ctrl));
-    for (vector<int>::iterator socketIter = iter->second.begin();
+    for (vector<JavaSocket*>::iterator socketIter = iter->second.begin();
         socketIter != iter->second.end(); socketIter++)
     {
-      shutdown(*socketIter, SHUT_RDWR);
+      (*socketIter)->shutdown();
     }
+    // Clean-up shut down sockets
     iter->second.clear();
   }
 }
Index: carob/src/ControllerWatcher.cpp
diff -u carob/src/ControllerWatcher.cpp:1.5 carob/src/ControllerWatcher.cpp:1.6
--- carob/src/ControllerWatcher.cpp:1.5 Thu Nov 30 21:10:28 2006
+++ carob/src/ControllerWatcher.cpp     Mon Dec  4 15:13:48 2006
@@ -24,6 +24,7 @@
 #include "ControllerPingSender.hpp"
 #include "ControllerStateChangedCallback.hpp"
 #include "WatchedControllers.hpp"
+#include "JavaSocket.hpp"
 #include "ControllerInfo.hpp"
 #include "Common.hpp"
 
@@ -31,13 +32,6 @@
 
 #include <unistd.h> // usleep()
 
-#ifndef CAROB_USE_SELECT
-#include <sys/poll.h>
-#define POLL_FUNCTION L"Poll"
-#else
-#define POLL_FUNCTION L"Select"
-#endif
-
 using namespace CarobNS;
 using std::vector;
 using std::wstring;
@@ -107,7 +101,7 @@
     throw ConnectionException(L"Could not turn ping socket to non-blocking 
mode.");
     }
 #endif
-  
+
   pinger_ptr = new ControllerPingSender(*controllers_ptr, socket_fd, 
ping_delay_in_ms);
   pthread_create(&pinger_thread, NULL, PingSenderThread, pinger_ptr);
 }
@@ -162,33 +156,8 @@
   while (!is_stopped)
   {
     // return of poll/select function
-    int retVal = 0;
-#ifndef CAROB_USE_SELECT
-    // create poll fd
-    struct pollfd pfd;
-    pfd.fd = socket_fd;
-    pfd.events = POLLIN|POLLPRI;
-    // The pfd.revents is is going to be modified
-    retVal = poll(&pfd, 1, ping_delay_in_ms);
-#else
-    // create select fd
-    fd_set sfd;
-    FD_ZERO(&sfd);
-    struct timeval tv;
-    tv.tv_sec = 0;
-    tv.tv_usec = ping_delay_in_ms*1000; // milli -> micro seconds conversion
-    FD_SET(socket_fd, &sfd);
-    retVal = select(socket_fd+1, &sfd, NULL, NULL, &tv);
-#endif
-    if (retVal == -1)
-    {
-      if (isWarnEnabled())
-      {
-        logWarn(fctName, wstring(POLL_FUNCTION) + L" returned error 
#"+toUserString(errno));
-      }
-      // clean-up will be done a the end of this function
-    }
-    // if (retVal == 0) => timeout, no respond, do nothing
+    int retVal = JavaSocket::pollOnSingleFd(socket_fd, ping_delay_in_ms);
+    // if (retVal == 0) => timeout, no response, do nothing
     if (retVal > 0)
     {
       // process all responses received until now
Index: carob/src/JavaSocket.cpp
diff -u carob/src/JavaSocket.cpp:1.58 carob/src/JavaSocket.cpp:1.59
--- carob/src/JavaSocket.cpp:1.58       Fri Dec  1 14:56:24 2006
+++ carob/src/JavaSocket.cpp    Mon Dec  4 15:13:48 2006
@@ -35,6 +35,13 @@
 #include <errno.h>
 #include <fcntl.h>
 
+#ifndef CAROB_USE_SELECT
+#include <sys/poll.h>
+#define POLL_FUNCTION L"Poll"
+#else
+#define POLL_FUNCTION L"Select"
+#endif
+
 using std::wstring;
 
 using namespace CarobNS;
@@ -271,26 +278,66 @@
   sendToSocket(fctName, L"JavaBytes", data, length, SOCKET_SEND_FLAGS);
 }
 
+/* static */
+int JavaSocket::pollOnSingleFd(int socketFd, int pollTimeoutInMs)
+{
+  // return of poll/select function
+  int retVal = 0;
+#ifndef CAROB_USE_SELECT
+  // create poll fd
+  struct pollfd pfd;
+  pfd.fd = socketFd;
+  pfd.events = POLLIN|POLLPRI;
+  // The pfd.revents is is going to be modified
+  retVal = poll(&pfd, 1, pollTimeoutInMs);
+#else
+  // create select fd
+  fd_set sfd;
+  FD_ZERO(&sfd);
+  struct timeval tv;
+  tv.tv_sec = 0;
+  tv.tv_usec = pollTimeout*1000; // milli -> micro seconds conversion
+  FD_SET(socket_fd, &sfd);
+  retVal = select(socket_fd+1, &sfd, NULL, NULL, &tv);
+#endif
+  if (retVal == -1)
+  {
+    if (isWarnEnabled())
+    {
+      logWarn(L"JavaSocket::pollOnSingleFd", wstring(POLL_FUNCTION) + L" 
returned error #"+toUserString(errno));
+    }
+  }
+  return retVal;
+}
+
 /** Returns the number of octets read or the (negative) error from recv() */
 int JavaSocket::recvFully(void *buf, const int len, const int flags)
-    const
+    const throw (SocketIOException, UnexpectedException)
 {
   int alreadyRead = 0; //Keeps the whole size read 
   int readThisTime = 0; //The size read at each receive
-  while (alreadyRead < len)
+  while (connected && alreadyRead < len)
   {
-    readThisTime = recv(socket_fd,
-                        (char*)(static_cast<uint8_t*>(
-                                      
((static_cast<uint8_t*>(buf))+alreadyRead))),
-                        len-alreadyRead,
-                        flags);
-    if (readThisTime <= 0)
+    int retVal = pollOnSingleFd(socket_fd, 1000); // 1000 = 1 second timeout
+    // if (retVal == 0) => timeout, no response, do nothing
+    if (retVal > 0) // data is ready to be read
     {
-      //this is an error
-      return readThisTime;
+      readThisTime = recv(socket_fd,
+                          (char*)(static_cast<uint8_t*>(
+                                        
((static_cast<uint8_t*>(buf))+alreadyRead))),
+                          len-alreadyRead,
+                          flags);
+      if (readThisTime <= 0)
+      {
+        //this is an error
+        return readThisTime;
+      }
+      alreadyRead += readThisTime;
     }
-    alreadyRead += readThisTime;
   }
+  // We were shutdown()
+  if (!connected)
+    throw SocketIOException(L"JavaSocket shut down, read aborted");
   return alreadyRead;
 }
 

_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits

Reply via email to