Author: roger
Date: Fri Jul 8 12:23:31 2011
New Revision: 1144286
URL: http://svn.apache.org/viewvc?rev=1144286&view=rev
Log:
THRIFT-1217 Use evutil_socketpair instead of pipe
Patch: alexandre parenteau
Modified:
thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
thrift/trunk/lib/cpp/src/server/TNonblockingServer.h
Modified: thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
URL:
http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp?rev=1144286&r1=1144285&r2=1144286&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp (original)
+++ thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp Fri Jul 8 12:23:31
2011
@@ -22,10 +22,20 @@
#include <transport/TSocket.h>
#include <iostream>
+
+#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_NETDB_H
#include <netdb.h>
+#endif
+
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
@@ -708,7 +718,7 @@ void TNonblockingServer::listenSocket()
#ifdef IPV6_V6ONLY
if (res->ai_family == AF_INET6) {
int zero = 0;
- if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
+ if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
const_cast_sockopt(&zero), sizeof(zero))) {
GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
}
}
@@ -718,9 +728,9 @@ void TNonblockingServer::listenSocket()
int one = 1;
// Set reuseaddr to avoid 2MSL delay on server restart
- setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+ setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one),
sizeof(one));
- if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+ if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
close(s);
freeaddrinfo(res0);
throw TException("TNonblockingServer::serve() bind");
@@ -750,20 +760,20 @@ void TNonblockingServer::listenSocket(in
struct linger ling = {0, 0};
// Keepalive to ensure full result flushing
- setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+ setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one),
sizeof(one));
// Turn linger off to avoid hung sockets
- setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+ setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling),
sizeof(ling));
// Set TCP nodelay if available, MAC OS X Hack
// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
#ifndef TCP_NOPUSH
- setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one),
sizeof(one));
#endif
#ifdef TCP_LOW_MIN_RTO
if (TSocket::getUseLowMinRto()) {
- setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
+ setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one),
sizeof(one));
}
#endif
@@ -777,13 +787,12 @@ void TNonblockingServer::listenSocket(in
}
void TNonblockingServer::createNotificationPipe() {
- if (pipe(notificationPipeFDs_) != 0) {
- GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
- throw TException("can't create notification pipe");
+ if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ",
EVUTIL_SOCKET_ERROR());
+ throw TException("can't create notification pipe");
}
- int flags;
- if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
- fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+ if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+ evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
close(notificationPipeFDs_[0]);
close(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe()
O_NONBLOCK");
Modified: thrift/trunk/lib/cpp/src/server/TNonblockingServer.h
URL:
http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h?rev=1144286&r1=1144285&r2=1144286&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TNonblockingServer.h (original)
+++ thrift/trunk/lib/cpp/src/server/TNonblockingServer.h Fri Jul 8 12:23:31
2011
@@ -44,6 +44,36 @@ using apache::thrift::concurrency::Threa
// Forward declaration of class
class TConnection;
+#ifdef LIBEVENT_VERSION_NUMBER
+#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
+#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
+#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
+#else
+// assume latest version 1 series
+#define LIBEVENT_VERSION_MAJOR 1
+#define LIBEVENT_VERSION_MINOR 14
+#define LIBEVENT_VERSION_REL 13
+#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) |
(LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
+#endif
+
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ typedef int evutil_socket_t;
+#endif
+
+#ifndef SOCKOPT_CAST_T
+#define SOCKOPT_CAST_T void
+#endif
+
+template<class T>
+inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
+ return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
+}
+
+template<class T>
+inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
+ return reinterpret_cast<SOCKOPT_CAST_T*>(v);
+}
+
/**
* This is a non-blocking server in C++ for high performance that operates a
* single IO thread. It assumes that all incoming requests are framed with a
@@ -176,7 +206,7 @@ class TNonblockingServer : public TServe
uint64_t nTotalConnectionsDropped_;
/// File descriptors for pipe used for task completion notification.
- int notificationPipeFDs_[2];
+ evutil_socket_t notificationPipeFDs_[2];
/**
* This is a stack of all the objects that have been created but that
@@ -634,7 +664,7 @@ class TNonblockingServer : public TServe
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TNonblockingServer's "this".
*/
- static void eventHandler(int fd, short which, void* v) {
+ static void eventHandler(evutil_socket_t fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
@@ -874,7 +904,7 @@ class TConnection {
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TConnection's "this".
*/
- static void eventHandler(int fd, short /* which */, void* v) {
+ static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
((TConnection*)v)->workSocket();
}
@@ -887,17 +917,17 @@ class TConnection {
*
* @param fd the descriptor the event occured on.
*/
- static void taskHandler(int fd, short /* which */, void* /* v */) {
+ static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v
*/) {
TConnection* connection;
ssize_t nBytes;
- while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
+ while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*),
0))
== sizeof(TConnection*)) {
connection->transition();
}
if (nBytes > 0) {
throw TException("TConnection::taskHandler unexpected partial read");
}
- if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
GlobalOutput.perror("TConnection::taskHandler read failed, resource
leak", errno);
}
}
@@ -911,8 +941,8 @@ class TConnection {
*/
bool notifyServer() {
TConnection* connection = this;
- if (write(server_->getNotificationSendFD(), (const void*)&connection,
- sizeof(TConnection*)) != sizeof(TConnection*)) {
+ if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
+ sizeof(TConnection*), 0) != sizeof(TConnection*)) {
return false;
}