Date: Monday, December 18, 2006 @ 17:02:51
Author: gilles
Path: /cvsroot/carob/carob
Modified: include/JavaSocket.hpp (1.36 -> 1.37) include/StringCodecs.hpp
(1.16 -> 1.17) src/ControllerInfo.cpp (1.6 -> 1.7)
src/JavaSocket.cpp (1.60 -> 1.61)
Introduced non-blocking connect() to handle
controllers-dead-but-not-yet-detected controller connections
Prettyfied error messages comming from errno (get the string instead of the
error #)
--------------------------+
include/JavaSocket.hpp | 17 +++++-
include/StringCodecs.hpp | 11 +++
src/ControllerInfo.cpp | 3 -
src/JavaSocket.cpp | 126 ++++++++++++++++++++++++++++++++++++++-------
4 files changed, 133 insertions(+), 24 deletions(-)
Index: carob/include/JavaSocket.hpp
diff -u carob/include/JavaSocket.hpp:1.36 carob/include/JavaSocket.hpp:1.37
--- carob/include/JavaSocket.hpp:1.36 Mon Dec 4 15:13:48 2006
+++ carob/include/JavaSocket.hpp Mon Dec 18 17:02:51 2006
@@ -173,7 +173,7 @@
* makes read operations throw a SocketIOException
* @see #recvFully(void *buf, const int len, const int flags) const
*/
- void shutdown() { connected = false; }
+ void shutdown() { canceled = true; }
/**
* Execute poll (or select depending on compilation flag CAROB_USE_SELECT) on
* the given file descriptor and return after either data is ready to be read
@@ -181,10 +181,11 @@
* @param socketFd socket file descriptor on which to wait for incomming data
* @param pollTimeoutInMs timeout after which to return if no data is ready
to
* be read on socket
+ * @param pollOnWrites true makes poll on write (false by default = on read)
* @return >0 if data is ready to be read, 0 if timeout reached, -1 if an
* error occured
*/
- static int pollOnSingleFd(int socketFd, int pollTimeoutInMs);
+ static int pollOnSingleFd(int socketFd, int pollTimeoutInMs, bool
pollOnWrites = false);
protected:
/**
* Substitute for recv. Waits for incomming data by calling pollOnSingleFd
@@ -231,6 +232,18 @@
int socket_fd;
/** true if the socket is connected to a host */
bool connected;
+ /** true cancels all input and connections attempts */
+ bool canceled;
+
+ /**
+ * Sets the socket to desired blocking mode
+ *
+ * @param fctName calling function name to display debug info
+ * @param blocking true to set to blocking mode, false to non-blocking
+ * @throw ConnectionException if the operation fails
+ */
+ void setBlockingMode(const std::wstring& fctName, bool blocking)
+ throw (ConnectionException, UnexpectedException);
/**
* Function converts the unsigned 64bit integer from network byte order to
host byte order.
Index: carob/include/StringCodecs.hpp
diff -u carob/include/StringCodecs.hpp:1.16 carob/include/StringCodecs.hpp:1.17
--- carob/include/StringCodecs.hpp:1.16 Mon Dec 4 23:14:10 2006
+++ carob/include/StringCodecs.hpp Mon Dec 18 17:02:51 2006
@@ -181,7 +181,11 @@
class StaticCodecs
{
public:
- /** Converts ASCII "encoded" string to a wide string */
+ /**
+ * Converts ASCII "encoded" string to a wide string.<br>
+ * To be used only when the input string is definitely ASCII
+ * @see #fromString(const std::string&)
+ */
static std::wstring fromASCII(const std::string& in) throw (CodecException)
{ return ascii_codec.decode(in); }
/** Converts UTF-8 encoded string to a wide string */
@@ -190,7 +194,10 @@
/** Converts a wide string to a UTF-8 encoded string */
static std::string toUTF8(const std::wstring& in) throw (CodecException)
{ return utf8_codec.encode(in); }
- /** Converts user's locale encoded string to a wide string */
+ /**
+ * Converts user's locale encoded string to a wide string<br>
+ * To be used for conversions of locale dependent strings, eg. system
messages
+ */
static std::wstring fromString(const std::string& in) throw (CodecException)
{ return user_codec.decode(in); }
/** Converts a wide string to a user's locale encoded string */
Index: carob/src/ControllerInfo.cpp
diff -u carob/src/ControllerInfo.cpp:1.6 carob/src/ControllerInfo.cpp:1.7
--- carob/src/ControllerInfo.cpp:1.6 Mon Dec 11 19:14:45 2006
+++ carob/src/ControllerInfo.cpp Mon Dec 18 17:02:51 2006
@@ -258,8 +258,7 @@
pool.unRegisterSocket(*this, socketPtr);
delete socketPtr;
freeaddrinfo(addr);
- throw ConnectionException(L"Unable to connect. Last error code was "
- + toUserString(errno));
+ throw ConnectionException(L"Connection failed");
}
bool ControllerInfo::operator ==(const ControllerInfo& ci) const
Index: carob/src/JavaSocket.cpp
diff -u carob/src/JavaSocket.cpp:1.60 carob/src/JavaSocket.cpp:1.61
--- carob/src/JavaSocket.cpp:1.60 Mon Dec 4 16:25:02 2006
+++ carob/src/JavaSocket.cpp Mon Dec 18 17:02:51 2006
@@ -48,9 +48,10 @@
using namespace CarobNS;
-JavaSocket::JavaSocket() :
+JavaSocket::JavaSocket() :
socket_fd(-1),
-connected(false)
+connected(false),
+canceled(false)
{
}
@@ -76,6 +77,7 @@
wstring fctName(L"JavaSocket::Create");
if (isDebugEnabled())
logDebug(fctName, L"Creating socket...");
+
socket_fd = socket(domain, SOCK_STREAM, 0);
if (socket_fd == -1)
@@ -85,19 +87,25 @@
throw ConnectionException(L"Socket creation failed");
}
- //Set socket options
+ // Set socket options
+ // 1. non-blocking
+ setBlockingMode(fctName, false);
+ // 2. Disable nagle algorithm
int opt_value = 1;
if (setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char*>(&opt_value), sizeof opt_value) == -1
#if CAROB_USE_SO_NOSIGPIPE
+ // 3. Disable sigpipe
|| setsockopt(socket_fd, SOL_SOCKET, SO_NOSIGPIPE,
reinterpret_cast<char*>(&opt_value), sizeof (opt_value) ==
-1)
#endif
)
{
+ wstring msg = L"Could not set socket options: setsockopt returned error: ";
+ msg += StaticCodecs::fromString(strerror(errno));
if (isErrorEnabled())
- logError(fctName, L"Socket set option failed");
- throw ConnectionException(L"Set option failed on socket");
+ logError(fctName, msg);
+ throw ConnectionException(msg);
}
if (isDebugEnabled())
@@ -119,13 +127,64 @@
sockaddr_in addr = {0};
addr = *(reinterpret_cast<struct sockaddr_in*>(ai->ai_addr));
addr.sin_port = htons(port);
-
+
+ // try non blocking connect
int resp = ::connect(socket_fd, reinterpret_cast<sockaddr*>(&addr),
sizeof(addr));
if (resp==0)
{
+ // connection succeeded, set back to blocking mode
+ setBlockingMode(fctName, true);
connected = true;
return true;
}
+ if (errno == EINPROGRESS)
+ {
+ // connection is not established yet. Loop until connection ok, error or
cancelation
+
+ if (isDebugEnabled())
+ logDebug(fctName, L"Socket connect() in progress... polling");
+ while (!canceled)
+ {
+ int retVal = pollOnSingleFd(socket_fd, 1000, true); // 1000 = 1 second
timeout
+ if (retVal > 0)
+ {
+ // Check that the socket is not in error:
+ int32_t sockErr;
+ socklen_t sockErrLen = sizeof(sockErr);
+ if (getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, (void*)(&sockErr),
&sockErrLen) < 0)
+ {
+ wstring msg = L"Could not get socket option: getsockopt returned
error: ";
+ msg += StaticCodecs::fromString(strerror(errno));
+ if (isErrorEnabled())
+ logError(fctName, msg);
+ throw ConnectionException(msg);
+ }
+ // If the error code returned is 0 => connect ok
+ if (sockErr == 0)
+ {
+ if (isDebugEnabled())
+ logDebug(fctName, L"Connection succeeded");
+ // connection succeeded, set back to blocking mode
+ setBlockingMode(fctName, true);
+ connected = true;
+ return true;
+ }
+ else
+ {
+ wstring msg = L"connect() returned error: ";
+ msg += StaticCodecs::fromString(strerror(sockErr));
+ if (isErrorEnabled())
+ logError(fctName, msg);
+ throw ConnectionException(msg);
+ }
+ }
+ }
+ }
+ if (canceled && isDebugEnabled())
+ logDebug(fctName, L"Connection interrupted.");
+
+ // At this point, we either got an error or were canceled. Return an error
+ // anyway
return false;
}
@@ -140,11 +199,11 @@
if (close(socket_fd) != 0)
#endif
{
- wstring msg(L"Could not close socket. Error code is ");
+ wstring msg(L"Could not close socket. Error is ");
#ifdef __MINGW32__
msg += toUserString(WSAGetLastError());
#else
- msg += toUserString(errno);
+ msg += StaticCodecs::fromString(strerror(errno));;
#endif
throw SocketIOException(msg);
return false;
@@ -169,7 +228,7 @@
int32_t netlen = htonl(utf8str.length());
- //First write number of bytes to follow as
+ // First write number of bytes to follow as
sendToSocket(fctName, L"UTF string", &netlen, sizeof(netlen),
SOCKET_SEND_FLAGS);
sendToSocket(fctName, L"UTF string", utf8str.data(), utf8str.length(),
SOCKET_SEND_FLAGS);
@@ -180,7 +239,7 @@
throw (SocketIOException, CodecException, UnexpectedException)
{
wstring fctName(L"JavaSocket::readJavaUTF");
-
+
int32_t lenRecNet;
//the converted size
uint16_t lenRec;
@@ -220,7 +279,7 @@
// the data will be received with network byte order.
// so we must convert it after reception
int32_t rec = 0;
-
+
receiveFromSocket(fctName, L"Int32", &rec, sizeof(rec), 0);
i = ntohl(rec);
}
@@ -241,7 +300,7 @@
// the data will be received with network byte order.
// so we must convert it after reception
int64_t rec = 0;
-
+
receiveFromSocket(fctName, L"Int64", &rec, sizeof(rec), 0);
i = ntohll(rec);
}
@@ -281,7 +340,7 @@
}
/* static */
-int JavaSocket::pollOnSingleFd(int socketFd, int pollTimeoutInMs)
+int JavaSocket::pollOnSingleFd(int socketFd, int pollTimeoutInMs, bool
pollOnWrites/*=false*/)
{
// return of poll/select function
int retVal = 0;
@@ -289,7 +348,10 @@
// create poll fd
struct pollfd pfd;
pfd.fd = socketFd;
- pfd.events = POLLIN|POLLPRI;
+ if (pollOnWrites)
+ pfd.events = POLLOUT;
+ else
+ pfd.events = POLLIN|POLLPRI;
// The pfd.revents is is going to be modified
retVal = poll(&pfd, 1, pollTimeoutInMs);
#else
@@ -306,7 +368,9 @@
{
if (isWarnEnabled())
{
- logWarn(L"JavaSocket::pollOnSingleFd", wstring(POLL_FUNCTION) + L"
returned error #"+toUserString(errno));
+ logWarn(L"JavaSocket::pollOnSingleFd",
+ wstring(POLL_FUNCTION) + L" returned error: "
+ + StaticCodecs::fromString(strerror(errno)));
}
}
return retVal;
@@ -316,9 +380,9 @@
int JavaSocket::recvFully(void *buf, const int len, const int flags)
const throw (SocketIOException, UnexpectedException)
{
- int alreadyRead = 0; //Keeps the whole size read
+ int alreadyRead = 0; //Keeps the whole size read
int readThisTime = 0; //The size read at each receive
- while (connected && alreadyRead < len)
+ while (!canceled && alreadyRead < len)
{
int retVal = pollOnSingleFd(socket_fd, 1000); // 1000 = 1 second timeout
// if (retVal == 0) => timeout, no response, do nothing
@@ -384,9 +448,35 @@
// throw SocketIOException(fctName + L"something is really wrong!");
}
+void JavaSocket::setBlockingMode(const wstring& fctName, bool blocking)
+ throw (ConnectionException, UnexpectedException)
+{
+ long fcntlopt = fcntl(socket_fd, F_GETFL, NULL);
+ if (fcntlopt < 0)
+ {
+ wstring msg = L"Could not get socket options: fcntl returned error: ";
+ msg += StaticCodecs::fromString(strerror(errno));
+ if (isErrorEnabled())
+ logError(fctName, msg);
+ throw ConnectionException(msg);
+ }
+ if (blocking)
+ fcntlopt &= (~O_NONBLOCK);
+ else
+ fcntlopt |= O_NONBLOCK;
+ if(fcntl(socket_fd, F_SETFL, fcntlopt) < 0)
+ {
+ wstring msg = L"Could not set socket options: fcntl returned error: ";
+ msg += StaticCodecs::fromString(strerror(errno));
+ if (isErrorEnabled())
+ logError(fctName, msg);
+ throw ConnectionException(msg);
+ }
+}
+
const uint64_t JavaSocket::ntohll(const uint64_t &n) const
{
-#ifndef __BYTE_ORDER // typically found through <sys/param.h>
+#ifndef __BYTE_ORDER // typically found through <sys/param.h>
// or <netinet/*.h>
#error "__BYTE_ORDER undefined: unknown endianness, can't implement 64bits
swaps"
#endif
_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits