Date: Monday, December 18, 2006 @ 17:02:51
  Author: gilles
    Path: /cvsroot/carob/carob

Modified: include/JavaSocket.hpp (1.36 -> 1.37) include/StringCodecs.hpp
          (1.16 -> 1.17) src/ControllerInfo.cpp (1.6 -> 1.7)
          src/JavaSocket.cpp (1.60 -> 1.61)

Introduced non-blocking connect() to handle 
controllers-dead-but-not-yet-detected controller connections
Prettyfied error messages comming from errno (get the string instead of the 
error #)


--------------------------+
 include/JavaSocket.hpp   |   17 +++++-
 include/StringCodecs.hpp |   11 +++
 src/ControllerInfo.cpp   |    3 -
 src/JavaSocket.cpp       |  126 ++++++++++++++++++++++++++++++++++++++-------
 4 files changed, 133 insertions(+), 24 deletions(-)


Index: carob/include/JavaSocket.hpp
diff -u carob/include/JavaSocket.hpp:1.36 carob/include/JavaSocket.hpp:1.37
--- carob/include/JavaSocket.hpp:1.36   Mon Dec  4 15:13:48 2006
+++ carob/include/JavaSocket.hpp        Mon Dec 18 17:02:51 2006
@@ -173,7 +173,7 @@
    * makes read operations throw a SocketIOException
    * @see #recvFully(void *buf, const int len, const int flags) const
    */
-  void          shutdown() { connected = false; }
+  void          shutdown() { canceled = true; }
   /**
    * Execute poll (or select depending on compilation flag CAROB_USE_SELECT) on
    * the given file descriptor and return after either data is ready to be read
@@ -181,10 +181,11 @@
    * @param socketFd socket file descriptor on which to wait for incomming data
    * @param pollTimeoutInMs timeout after which to return if no data is ready 
to
    *                        be read on socket
+   * @param pollOnWrites true makes poll on write (false by default = on read)
    * @return >0 if data is ready to be read, 0 if timeout reached, -1 if an
    *         error occured
    */
-  static int    pollOnSingleFd(int socketFd, int pollTimeoutInMs);
+  static int    pollOnSingleFd(int socketFd, int pollTimeoutInMs, bool 
pollOnWrites = false);
 protected:
   /**
    * Substitute for recv. Waits for incomming data by calling pollOnSingleFd
@@ -231,6 +232,18 @@
   int           socket_fd;
   /** true if the socket is connected to a host */
   bool          connected;
+  /** true cancels all input and connections attempts */
+  bool          canceled;
+
+  /**
+   * Sets the socket to desired blocking mode
+   * 
+   * @param fctName calling function name to display debug info
+   * @param blocking true to set to blocking mode, false to non-blocking
+   * @throw ConnectionException if the operation fails
+   */
+  void          setBlockingMode(const std::wstring& fctName, bool blocking)
+                    throw (ConnectionException, UnexpectedException);
 
   /**
    * Function converts the unsigned 64bit integer from network byte order to 
host byte order.
Index: carob/include/StringCodecs.hpp
diff -u carob/include/StringCodecs.hpp:1.16 carob/include/StringCodecs.hpp:1.17
--- carob/include/StringCodecs.hpp:1.16 Mon Dec  4 23:14:10 2006
+++ carob/include/StringCodecs.hpp      Mon Dec 18 17:02:51 2006
@@ -181,7 +181,11 @@
 class StaticCodecs
 {
 public:
-  /** Converts ASCII "encoded" string to a wide string */
+  /**
+   * Converts ASCII "encoded" string to a wide string.<br>
+   * To be used only when the input string is definitely ASCII
+   * @see #fromString(const std::string&)
+   */
   static std::wstring fromASCII(const std::string& in) throw (CodecException)
   { return ascii_codec.decode(in); }
   /** Converts UTF-8 encoded string to a wide string */
@@ -190,7 +194,10 @@
   /** Converts a wide string to a UTF-8 encoded string */
   static std::string toUTF8(const std::wstring& in) throw (CodecException)
   { return utf8_codec.encode(in); }
-  /** Converts user's locale encoded string to a wide string */
+  /**
+   * Converts user's locale encoded string to a wide string<br>
+   * To be used for conversions of locale dependent strings, eg. system 
messages
+   */
   static std::wstring fromString(const std::string& in) throw (CodecException)
   { return user_codec.decode(in); }
   /** Converts a wide string to a user's locale encoded string */
Index: carob/src/ControllerInfo.cpp
diff -u carob/src/ControllerInfo.cpp:1.6 carob/src/ControllerInfo.cpp:1.7
--- carob/src/ControllerInfo.cpp:1.6    Mon Dec 11 19:14:45 2006
+++ carob/src/ControllerInfo.cpp        Mon Dec 18 17:02:51 2006
@@ -258,8 +258,7 @@
   pool.unRegisterSocket(*this, socketPtr);
   delete socketPtr;
   freeaddrinfo(addr);
-  throw ConnectionException(L"Unable to connect. Last error code was "
-      + toUserString(errno));
+  throw ConnectionException(L"Connection failed");
 }
 
 bool ControllerInfo::operator ==(const ControllerInfo& ci) const
Index: carob/src/JavaSocket.cpp
diff -u carob/src/JavaSocket.cpp:1.60 carob/src/JavaSocket.cpp:1.61
--- carob/src/JavaSocket.cpp:1.60       Mon Dec  4 16:25:02 2006
+++ carob/src/JavaSocket.cpp    Mon Dec 18 17:02:51 2006
@@ -48,9 +48,10 @@
 
 using namespace CarobNS;
 
-JavaSocket::JavaSocket() : 
+JavaSocket::JavaSocket() :
 socket_fd(-1),
-connected(false)
+connected(false),
+canceled(false)
 {
 }
 
@@ -76,6 +77,7 @@
   wstring fctName(L"JavaSocket::Create");
   if (isDebugEnabled())
     logDebug(fctName, L"Creating socket...");
+
   socket_fd = socket(domain, SOCK_STREAM, 0);
 
   if (socket_fd == -1)
@@ -85,19 +87,25 @@
     throw ConnectionException(L"Socket creation failed");
   }
 
-  //Set socket options
+  // Set socket options
+  // 1. non-blocking
+  setBlockingMode(fctName, false);
+  // 2. Disable nagle algorithm
   int opt_value = 1;
   if (setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY,
                  reinterpret_cast<char*>(&opt_value), sizeof opt_value) == -1
 #if CAROB_USE_SO_NOSIGPIPE
+  // 3. Disable sigpipe
       || setsockopt(socket_fd, SOL_SOCKET, SO_NOSIGPIPE,
                     reinterpret_cast<char*>(&opt_value), sizeof (opt_value) == 
-1)
 #endif
     )
   {
+    wstring msg = L"Could not set socket options: setsockopt returned error: ";
+    msg += StaticCodecs::fromString(strerror(errno));
     if (isErrorEnabled())
-      logError(fctName, L"Socket set option failed");
-    throw ConnectionException(L"Set option failed on socket");
+      logError(fctName, msg);
+    throw ConnectionException(msg);
   }
 
   if (isDebugEnabled())
@@ -119,13 +127,64 @@
   sockaddr_in addr = {0};
   addr = *(reinterpret_cast<struct sockaddr_in*>(ai->ai_addr));
   addr.sin_port = htons(port);
-  
+
+  // try non blocking connect
   int resp = ::connect(socket_fd, reinterpret_cast<sockaddr*>(&addr), 
sizeof(addr));
   if (resp==0)
   {
+    // connection succeeded, set back to blocking mode
+    setBlockingMode(fctName, true);
     connected = true;
     return true;
   }
+  if (errno == EINPROGRESS)
+  {
+    // connection is not established yet. Loop until connection ok, error or 
cancelation
+
+    if (isDebugEnabled())
+      logDebug(fctName, L"Socket connect() in progress... polling");
+    while (!canceled)
+    {
+      int retVal = pollOnSingleFd(socket_fd, 1000, true); // 1000 = 1 second 
timeout
+      if (retVal > 0)
+      {
+        // Check that the socket is not in error:
+        int32_t sockErr;
+        socklen_t sockErrLen = sizeof(sockErr);
+        if (getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, (void*)(&sockErr), 
&sockErrLen) < 0)
+        {
+          wstring msg = L"Could not get socket option: getsockopt returned 
error: ";
+          msg += StaticCodecs::fromString(strerror(errno));
+          if (isErrorEnabled())
+            logError(fctName, msg);
+          throw ConnectionException(msg);
+        }
+        // If the error code returned is 0 => connect ok
+        if (sockErr == 0)
+        {
+          if (isDebugEnabled())
+            logDebug(fctName, L"Connection succeeded");
+          // connection succeeded, set back to blocking mode
+          setBlockingMode(fctName, true);
+          connected = true;
+          return true;
+        }
+        else
+        {
+          wstring msg = L"connect() returned error: ";
+          msg += StaticCodecs::fromString(strerror(sockErr));
+          if (isErrorEnabled())
+            logError(fctName, msg);
+          throw ConnectionException(msg);
+        }
+      }
+    }
+  }
+  if (canceled && isDebugEnabled())
+    logDebug(fctName, L"Connection interrupted.");
+
+  // At this point, we either got an error or were canceled. Return an error
+  // anyway
   return false;
 }
 
@@ -140,11 +199,11 @@
     if (close(socket_fd) != 0)
 #endif
     {
-      wstring msg(L"Could not close socket. Error code is ");
+      wstring msg(L"Could not close socket. Error is ");
 #ifdef __MINGW32__
       msg += toUserString(WSAGetLastError());
 #else
-      msg += toUserString(errno);
+      msg += StaticCodecs::fromString(strerror(errno));;
 #endif
       throw SocketIOException(msg);
       return false;
@@ -169,7 +228,7 @@
 
   int32_t netlen = htonl(utf8str.length());
 
-  //First write number of bytes to follow as 
+  // First write number of bytes to follow as
   sendToSocket(fctName, L"UTF string", &netlen, sizeof(netlen), 
SOCKET_SEND_FLAGS);
   sendToSocket(fctName, L"UTF string", utf8str.data(), utf8str.length(), 
SOCKET_SEND_FLAGS);
 
@@ -180,7 +239,7 @@
   throw (SocketIOException, CodecException, UnexpectedException)
 {
   wstring fctName(L"JavaSocket::readJavaUTF");
-  
+
   int32_t lenRecNet;
   //the converted size
   uint16_t lenRec;
@@ -220,7 +279,7 @@
   // the data will be received with network byte order.
   // so we must convert it after reception
   int32_t rec = 0;
-  
+
   receiveFromSocket(fctName, L"Int32", &rec, sizeof(rec), 0);
   i = ntohl(rec);
 }
@@ -241,7 +300,7 @@
   // the data will be received with network byte order.
   // so we must convert it after reception
   int64_t rec = 0;
-  
+
   receiveFromSocket(fctName, L"Int64", &rec, sizeof(rec), 0);
   i = ntohll(rec);
 }
@@ -281,7 +340,7 @@
 }
 
 /* static */
-int JavaSocket::pollOnSingleFd(int socketFd, int pollTimeoutInMs)
+int JavaSocket::pollOnSingleFd(int socketFd, int pollTimeoutInMs, bool 
pollOnWrites/*=false*/)
 {
   // return of poll/select function
   int retVal = 0;
@@ -289,7 +348,10 @@
   // create poll fd
   struct pollfd pfd;
   pfd.fd = socketFd;
-  pfd.events = POLLIN|POLLPRI;
+  if (pollOnWrites)
+    pfd.events = POLLOUT;
+  else
+    pfd.events = POLLIN|POLLPRI;
   // The pfd.revents is is going to be modified
   retVal = poll(&pfd, 1, pollTimeoutInMs);
 #else
@@ -306,7 +368,9 @@
   {
     if (isWarnEnabled())
     {
-      logWarn(L"JavaSocket::pollOnSingleFd", wstring(POLL_FUNCTION) + L" 
returned error #"+toUserString(errno));
+      logWarn(L"JavaSocket::pollOnSingleFd",
+          wstring(POLL_FUNCTION) + L" returned error: "
+          + StaticCodecs::fromString(strerror(errno)));
     }
   }
   return retVal;
@@ -316,9 +380,9 @@
 int JavaSocket::recvFully(void *buf, const int len, const int flags)
     const throw (SocketIOException, UnexpectedException)
 {
-  int alreadyRead = 0; //Keeps the whole size read 
+  int alreadyRead = 0; //Keeps the whole size read
   int readThisTime = 0; //The size read at each receive
-  while (connected && alreadyRead < len)
+  while (!canceled && alreadyRead < len)
   {
     int retVal = pollOnSingleFd(socket_fd, 1000); // 1000 = 1 second timeout
     // if (retVal == 0) => timeout, no response, do nothing
@@ -384,9 +448,35 @@
   //  throw SocketIOException(fctName + L"something is really wrong!");
 }
 
+void JavaSocket::setBlockingMode(const wstring& fctName, bool blocking)
+    throw (ConnectionException, UnexpectedException)
+{
+  long fcntlopt = fcntl(socket_fd, F_GETFL, NULL);
+  if (fcntlopt < 0)
+  {
+    wstring msg = L"Could not get socket options: fcntl returned error: ";
+    msg += StaticCodecs::fromString(strerror(errno));
+    if (isErrorEnabled())
+      logError(fctName, msg);
+    throw ConnectionException(msg);
+  }
+  if (blocking)
+    fcntlopt &= (~O_NONBLOCK);
+  else
+    fcntlopt |= O_NONBLOCK;
+  if(fcntl(socket_fd, F_SETFL, fcntlopt) < 0)
+  {
+    wstring msg = L"Could not set socket options: fcntl returned error: ";
+    msg += StaticCodecs::fromString(strerror(errno));
+    if (isErrorEnabled())
+      logError(fctName, msg);
+    throw ConnectionException(msg);
+  }
+}
+
 const uint64_t JavaSocket::ntohll(const uint64_t &n) const
 {
-#ifndef __BYTE_ORDER // typically found through <sys/param.h> 
+#ifndef __BYTE_ORDER // typically found through <sys/param.h>
                      // or <netinet/*.h>
 #error "__BYTE_ORDER undefined: unknown endianness, can't implement 64bits 
swaps"
 #endif

_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits

Reply via email to