Date: Monday, December 4, 2006 @ 15:13:48
Author: gilles
Path: /cvsroot/carob/carob
Modified: include/ControllerPool.hpp (1.7 -> 1.8)
include/ControllerStateChangedCallback.hpp (1.2 -> 1.3)
include/JavaSocket.hpp (1.35 -> 1.36) src/ControllerInfo.cpp
(1.4 -> 1.5) src/ControllerPool.cpp (1.11 -> 1.12)
src/ControllerStateChangedCallback.cpp (1.3 -> 1.4)
src/ControllerWatcher.cpp (1.5 -> 1.6) src/JavaSocket.cpp (1.58
-> 1.59)
Removing possibly non-thread safe shutdown(socketfd) call, replacing by
non-blockant-like reads using poll:
. Callback now takes JavaSockets (instead of fds) and calls shutdown() on these
objects
. JavaSocket reads are now done after a poll() (or select), so they can be
interrupted by calling shutdown(); they will then throw a SocketIOException
Centralized poll call in a static function of JavaSocket class
Fixed bug in ltPoolIndex, was not comparing same length controller lists
--------------------------------------------+
include/ControllerPool.hpp | 7 +-
include/ControllerStateChangedCallback.hpp | 5 +
include/JavaSocket.hpp | 26 ++++++++--
src/ControllerInfo.cpp | 5 +
src/ControllerPool.cpp | 11 ++--
src/ControllerStateChangedCallback.cpp | 16 +++---
src/ControllerWatcher.cpp | 39 +--------------
src/JavaSocket.cpp | 69 ++++++++++++++++++++++-----
8 files changed, 109 insertions(+), 69 deletions(-)
Index: carob/include/ControllerPool.hpp
diff -u carob/include/ControllerPool.hpp:1.7
carob/include/ControllerPool.hpp:1.8
--- carob/include/ControllerPool.hpp:1.7 Tue Nov 28 17:07:55 2006
+++ carob/include/ControllerPool.hpp Mon Dec 4 15:13:48 2006
@@ -53,7 +53,7 @@
class ControllerWatcher;
class SocketKillerCallback;
-
+class JavaSocket;
/**
* Abstract class over each policy implementation, which is used by the driver
* to choose a controller to connect to.
@@ -94,7 +94,8 @@
* @param controller the controller to which the socket is connected
* @param socketFd the socket file descriptor to register
*/
- void registerSocket(const ControllerInfo& controller,
int socketFd);
+ void registerSocket(const ControllerInfo& controller,
+ JavaSocket* socket);
/**
* Tell the watcher that a failure has been detected on the given
controller.<br>
@@ -243,7 +244,7 @@
* distribution
* @throws DriverException if the given policy is invalid
*/
- static AbstractControllerPool& getPool(std::vector<ControllerInfo> ctrls,
+ static AbstractControllerPool& getPool(std::vector<ControllerInfo>& ctrls,
ConnectPolicy cp, int pingDelayInMs,
int controllerTimeoutInMs)
throw (DriverException,
UnexpectedException);
Index: carob/include/ControllerStateChangedCallback.hpp
diff -u carob/include/ControllerStateChangedCallback.hpp:1.2
carob/include/ControllerStateChangedCallback.hpp:1.3
--- carob/include/ControllerStateChangedCallback.hpp:1.2 Wed Nov 29
09:45:49 2006
+++ carob/include/ControllerStateChangedCallback.hpp Mon Dec 4 15:13:48 2006
@@ -31,6 +31,7 @@
namespace CarobNS {
class AbstractControllerPool;
+class JavaSocket;
/**
* Provides the two methods that will be called when a controller is detected
as
@@ -81,11 +82,11 @@
/** Just informs policy that a controller came back */
void onControllerUp(const ControllerInfo& ctrl);
/** Adds the given socket to the list of monitored sockets for given
controller */
- void registerSocket(const ControllerInfo& ctrl, int socketFd);
+ void registerSocket(const ControllerInfo& ctrl, JavaSocket* socketFd);
private:
/** The list of controllers with their set of sockets */
- std::map<ControllerInfo, std::vector<int> > controllers_and_sockets;
+ std::map<ControllerInfo, std::vector<JavaSocket*> > controllers_and_sockets;
/** Policy that created us, to tell about controller state changes */
AbstractControllerPool& policy;
/** controllers&sockets synchronization object */
Index: carob/include/JavaSocket.hpp
diff -u carob/include/JavaSocket.hpp:1.35 carob/include/JavaSocket.hpp:1.36
--- carob/include/JavaSocket.hpp:1.35 Fri Dec 1 14:56:24 2006
+++ carob/include/JavaSocket.hpp Mon Dec 4 15:13:48 2006
@@ -169,20 +169,36 @@
void writeJavaBytes(int32_t length, java_byte* data) const
throw (SocketIOException, UnexpectedException);
/**
- * Gets the socket file descriptor
- * @return the file descriptor
+ * Interrupts read operations on this socket, mark it as disconnected and
+ * makes read operations throw a SocketIOException
+ * @see #recvFully(void *buf, const int len, const int flags) const
*/
- int getFd() const { return socket_fd; }
+ void shutdown() { connected = false; }
+ /**
+ * Execute poll (or select depending on compilation flag CAROB_USE_SELECT) on
+ * the given file descriptor and return after either data is ready to be read
+ * or given timeout is elapsed
+ * @param socketFd socket file descriptor on which to wait for incomming data
+ * @param pollTimeoutInMs timeout after which to return if no data is ready
to
+ * be read on socket
+ * @return >0 if data is ready to be read, 0 if timeout reached, -1 if an
+ * error occured
+ */
+ static int pollOnSingleFd(int socketFd, int pollTimeoutInMs);
protected:
/**
- * Substitute for recv. Loops/blocks until full length has been received.
+ * Substitute for recv. Waits for incomming data by calling pollOnSingleFd
+ * and loops until full length has been received or an erro occured. If
+ * shutdown() is called during the loop, throws a SocketIOException to inform
+ * callers that the socket is not longer readable
* @param buf data to send
* @param len full buffer length
* @param flags send options, see recv man page
* @return the total number of bytes send, -1 in case of failure
+ * @throw SocketIOException if interrupted by shutdown() function
*/
int32_t recvFully(void *buf, const int len, const int flags)
- const;
+ const throw (SocketIOException, UnexpectedException);
/**
* Wrapper over send(...) function to handle errors and throw exceptions
* @param fctName name of the calling function (for logging purposes)
Index: carob/src/ControllerInfo.cpp
diff -u carob/src/ControllerInfo.cpp:1.4 carob/src/ControllerInfo.cpp:1.5
--- carob/src/ControllerInfo.cpp:1.4 Fri Dec 1 15:07:42 2006
+++ carob/src/ControllerInfo.cpp Mon Dec 4 15:13:48 2006
@@ -181,7 +181,7 @@
int error = getnameinfo(&UDP_addr, UDP_addr_len, hostname, sizeof(hostname),
NULL, 0, NI_NUMERICHOST);
if (error != 0)
{
- wstring wMsg(L"Error while executing getnameinfo() saf=" +
toWString(UDP_addr.sa_family) + L" inet="+toWString(PF_INET) +
L"inet6="+toWString(PF_INET6) + L" afinet="+toWString(AF_INET) +
L"afinet6="+toWString(AF_INET6));
+ wstring wMsg(L"Error while executing getnameinfo()");
#ifndef __MINGW32__
const char* gaiMsg = gai_strerror(error);
wMsg += L". Error was: " + fromString(gaiMsg);
@@ -212,7 +212,7 @@
}
socketPtr->create(family);
// register the socket to our policy so it can be killed during connection
- pool.registerSocket(*this, socketPtr->getFd());
+ pool.registerSocket(*this, socketPtr);
if (isDebugEnabled())
logDebug(fctName, L"Connecting...");
@@ -244,6 +244,7 @@
addrIter = addrIter->ai_next;
}
// could not connect to any of the addresses
+ freeaddrinfo(addr);
throw ConnectionException(L"Unable to connect. Last error code was "
+ toUserString(errno));
}
Index: carob/src/ControllerPool.cpp
diff -u carob/src/ControllerPool.cpp:1.11 carob/src/ControllerPool.cpp:1.12
--- carob/src/ControllerPool.cpp:1.11 Thu Nov 30 21:10:28 2006
+++ carob/src/ControllerPool.cpp Mon Dec 4 15:13:48 2006
@@ -74,6 +74,8 @@
}
catch (ConnectionException cause)
{
+ alive_controllers.clear();
+ delete callback_ptr;
throw DriverException(
L"Controller watcher creation failed!");
}
@@ -85,9 +87,10 @@
alive_controllers.clear();
}
-void AbstractControllerPool::registerSocket(const ControllerInfo& controller,
int socketFd)
+void AbstractControllerPool::registerSocket(const ControllerInfo& controller,
+ JavaSocket* socket)
{
- callback_ptr->registerSocket(controller, socketFd);
+ callback_ptr->registerSocket(controller, socket);
}
void AbstractControllerPool::forceControllerDown(const ControllerInfo&
controller)
@@ -174,7 +177,7 @@
// same policy, compare number of controllers
if (pi1.controllers.size() < pi2.controllers.size())
return true;
- else if (pi1.controllers.size() < pi2.controllers.size())
+ else if (pi1.controllers.size() > pi2.controllers.size())
return false;
// same size, compare controllers themselves
vector<ControllerInfo>::const_iterator iter_pi1 = pi1.controllers.begin();
@@ -198,7 +201,7 @@
std::map<PoolIndex, AbstractControllerPool*, ltPoolIndex>
ControllerPoolManager::pool_map;
/*static*/
-AbstractControllerPool& ControllerPoolManager::getPool(vector<ControllerInfo>
ctrls,
+AbstractControllerPool& ControllerPoolManager::getPool(vector<ControllerInfo>&
ctrls,
ConnectPolicy cp, int pingDelayInMs, int controllerTimeoutInMs)
throw (DriverException, UnexpectedException)
{
Index: carob/src/ControllerStateChangedCallback.cpp
diff -u carob/src/ControllerStateChangedCallback.cpp:1.3
carob/src/ControllerStateChangedCallback.cpp:1.4
--- carob/src/ControllerStateChangedCallback.cpp:1.3 Fri Dec 1 15:47:31 2006
+++ carob/src/ControllerStateChangedCallback.cpp Mon Dec 4 15:13:48 2006
@@ -23,6 +23,7 @@
#include "ControllerStateChangedCallback.hpp"
#include "ControllerInfo.hpp"
+#include "JavaSocket.hpp"
#include "ControllerPool.hpp"
#include "Common.hpp"
@@ -40,7 +41,7 @@
SocketKillerCallback::~SocketKillerCallback()
{
- for (map<ControllerInfo, vector<int> >::iterator iter =
controllers_and_sockets.begin();
+ for (map<ControllerInfo, vector<JavaSocket*> >::iterator iter =
controllers_and_sockets.begin();
iter != controllers_and_sockets.end(); iter++)
{
iter->second.clear();
@@ -48,11 +49,11 @@
controllers_and_sockets.clear();
}
-void SocketKillerCallback::registerSocket(const ControllerInfo& ctrl, int s)
+void SocketKillerCallback::registerSocket(const ControllerInfo& ctrl,
JavaSocket* s)
{
LockScope ls(&controllers_and_sockets_CS);
- vector<int> sockets;
- map<ControllerInfo, vector<int> >::iterator iter =
controllers_and_sockets.find(ctrl);
+ vector<JavaSocket*> sockets;
+ map<ControllerInfo, vector<JavaSocket*> >::iterator iter =
controllers_and_sockets.find(ctrl);
if (iter != controllers_and_sockets.end())
{
sockets = iter->second;
@@ -67,17 +68,18 @@
// tell policy asap
policy.controllerDown(ctrl);
LockScope ls(&controllers_and_sockets_CS);
- map<ControllerInfo, vector<int> >::iterator iter =
controllers_and_sockets.find(ctrl);
+ map<ControllerInfo, vector<JavaSocket*> >::iterator iter =
controllers_and_sockets.find(ctrl);
if (iter != controllers_and_sockets.end())
{
if (isDebugEnabled())
logDebug(fctName, L"Shutting down sockets connected to controller "
+ static_cast<wstring>(ctrl));
- for (vector<int>::iterator socketIter = iter->second.begin();
+ for (vector<JavaSocket*>::iterator socketIter = iter->second.begin();
socketIter != iter->second.end(); socketIter++)
{
- shutdown(*socketIter, SHUT_RDWR);
+ (*socketIter)->shutdown();
}
+ // Clean-up shut down sockets
iter->second.clear();
}
}
Index: carob/src/ControllerWatcher.cpp
diff -u carob/src/ControllerWatcher.cpp:1.5 carob/src/ControllerWatcher.cpp:1.6
--- carob/src/ControllerWatcher.cpp:1.5 Thu Nov 30 21:10:28 2006
+++ carob/src/ControllerWatcher.cpp Mon Dec 4 15:13:48 2006
@@ -24,6 +24,7 @@
#include "ControllerPingSender.hpp"
#include "ControllerStateChangedCallback.hpp"
#include "WatchedControllers.hpp"
+#include "JavaSocket.hpp"
#include "ControllerInfo.hpp"
#include "Common.hpp"
@@ -31,13 +32,6 @@
#include <unistd.h> // usleep()
-#ifndef CAROB_USE_SELECT
-#include <sys/poll.h>
-#define POLL_FUNCTION L"Poll"
-#else
-#define POLL_FUNCTION L"Select"
-#endif
-
using namespace CarobNS;
using std::vector;
using std::wstring;
@@ -107,7 +101,7 @@
throw ConnectionException(L"Could not turn ping socket to non-blocking
mode.");
}
#endif
-
+
pinger_ptr = new ControllerPingSender(*controllers_ptr, socket_fd,
ping_delay_in_ms);
pthread_create(&pinger_thread, NULL, PingSenderThread, pinger_ptr);
}
@@ -162,33 +156,8 @@
while (!is_stopped)
{
// return of poll/select function
- int retVal = 0;
-#ifndef CAROB_USE_SELECT
- // create poll fd
- struct pollfd pfd;
- pfd.fd = socket_fd;
- pfd.events = POLLIN|POLLPRI;
- // The pfd.revents is is going to be modified
- retVal = poll(&pfd, 1, ping_delay_in_ms);
-#else
- // create select fd
- fd_set sfd;
- FD_ZERO(&sfd);
- struct timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = ping_delay_in_ms*1000; // milli -> micro seconds conversion
- FD_SET(socket_fd, &sfd);
- retVal = select(socket_fd+1, &sfd, NULL, NULL, &tv);
-#endif
- if (retVal == -1)
- {
- if (isWarnEnabled())
- {
- logWarn(fctName, wstring(POLL_FUNCTION) + L" returned error
#"+toUserString(errno));
- }
- // clean-up will be done a the end of this function
- }
- // if (retVal == 0) => timeout, no respond, do nothing
+ int retVal = JavaSocket::pollOnSingleFd(socket_fd, ping_delay_in_ms);
+ // if (retVal == 0) => timeout, no response, do nothing
if (retVal > 0)
{
// process all responses received until now
Index: carob/src/JavaSocket.cpp
diff -u carob/src/JavaSocket.cpp:1.58 carob/src/JavaSocket.cpp:1.59
--- carob/src/JavaSocket.cpp:1.58 Fri Dec 1 14:56:24 2006
+++ carob/src/JavaSocket.cpp Mon Dec 4 15:13:48 2006
@@ -35,6 +35,13 @@
#include <errno.h>
#include <fcntl.h>
+#ifndef CAROB_USE_SELECT
+#include <sys/poll.h>
+#define POLL_FUNCTION L"Poll"
+#else
+#define POLL_FUNCTION L"Select"
+#endif
+
using std::wstring;
using namespace CarobNS;
@@ -271,26 +278,66 @@
sendToSocket(fctName, L"JavaBytes", data, length, SOCKET_SEND_FLAGS);
}
+/* static */
+int JavaSocket::pollOnSingleFd(int socketFd, int pollTimeoutInMs)
+{
+ // return of poll/select function
+ int retVal = 0;
+#ifndef CAROB_USE_SELECT
+ // create poll fd
+ struct pollfd pfd;
+ pfd.fd = socketFd;
+ pfd.events = POLLIN|POLLPRI;
+ // The pfd.revents is is going to be modified
+ retVal = poll(&pfd, 1, pollTimeoutInMs);
+#else
+ // create select fd
+ fd_set sfd;
+ FD_ZERO(&sfd);
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = pollTimeout*1000; // milli -> micro seconds conversion
+ FD_SET(socket_fd, &sfd);
+ retVal = select(socket_fd+1, &sfd, NULL, NULL, &tv);
+#endif
+ if (retVal == -1)
+ {
+ if (isWarnEnabled())
+ {
+ logWarn(L"JavaSocket::pollOnSingleFd", wstring(POLL_FUNCTION) + L"
returned error #"+toUserString(errno));
+ }
+ }
+ return retVal;
+}
+
/** Returns the number of octets read or the (negative) error from recv() */
int JavaSocket::recvFully(void *buf, const int len, const int flags)
- const
+ const throw (SocketIOException, UnexpectedException)
{
int alreadyRead = 0; //Keeps the whole size read
int readThisTime = 0; //The size read at each receive
- while (alreadyRead < len)
+ while (connected && alreadyRead < len)
{
- readThisTime = recv(socket_fd,
- (char*)(static_cast<uint8_t*>(
-
((static_cast<uint8_t*>(buf))+alreadyRead))),
- len-alreadyRead,
- flags);
- if (readThisTime <= 0)
+ int retVal = pollOnSingleFd(socket_fd, 1000); // 1000 = 1 second timeout
+ // if (retVal == 0) => timeout, no response, do nothing
+ if (retVal > 0) // data is ready to be read
{
- //this is an error
- return readThisTime;
+ readThisTime = recv(socket_fd,
+ (char*)(static_cast<uint8_t*>(
+
((static_cast<uint8_t*>(buf))+alreadyRead))),
+ len-alreadyRead,
+ flags);
+ if (readThisTime <= 0)
+ {
+ //this is an error
+ return readThisTime;
+ }
+ alreadyRead += readThisTime;
}
- alreadyRead += readThisTime;
}
+ // We were shutdown()
+ if (!connected)
+ throw SocketIOException(L"JavaSocket shut down, read aborted");
return alreadyRead;
}
_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits