http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.cpp deleted file mode 100644 index 4cecc3b..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.cpp +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#include <cstring> -#include <sys/types.h> -#ifdef HAVE_SYS_SOCKET_H -#include <sys/socket.h> -#endif -#ifdef HAVE_SYS_UN_H -#include <sys/un.h> -#endif -#ifdef HAVE_SYS_POLL_H -#include <sys/poll.h> -#endif -#ifdef HAVE_NETINET_IN_H -#include <netinet/in.h> -#include <netinet/tcp.h> -#endif -#ifdef HAVE_NETDB_H -#include <netdb.h> -#endif -#include <fcntl.h> -#ifdef HAVE_UNISTD_H -#include <unistd.h> -#endif - -#include <thrift/transport/TSocket.h> -#include <thrift/transport/TServerSocket.h> -#include <thrift/transport/PlatformSocket.h> -#include <boost/shared_ptr.hpp> - -#ifndef AF_LOCAL -#define AF_LOCAL AF_UNIX -#endif - -#ifndef SOCKOPT_CAST_T -# ifndef _WIN32 -# define SOCKOPT_CAST_T void -# else -# define SOCKOPT_CAST_T char -# endif // _WIN32 -#endif - -template<class T> -inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) { - return reinterpret_cast<const SOCKOPT_CAST_T*>(v); -} - -template<class T> -inline SOCKOPT_CAST_T* cast_sockopt(T* v) { - return reinterpret_cast<SOCKOPT_CAST_T*>(v); -} - -namespace apache { namespace thrift { namespace transport { - -using namespace std; -using boost::shared_ptr; - -TServerSocket::TServerSocket(int port) : - port_(port), - serverSocket_(THRIFT_INVALID_SOCKET), - acceptBacklog_(DEFAULT_BACKLOG), - sendTimeout_(0), - recvTimeout_(0), - accTimeout_(-1), - retryLimit_(0), - retryDelay_(0), - tcpSendBuffer_(0), - tcpRecvBuffer_(0), - intSock1_(THRIFT_INVALID_SOCKET), - intSock2_(THRIFT_INVALID_SOCKET) {} - -TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) : - port_(port), - serverSocket_(THRIFT_INVALID_SOCKET), - acceptBacklog_(DEFAULT_BACKLOG), - sendTimeout_(sendTimeout), - recvTimeout_(recvTimeout), - accTimeout_(-1), - retryLimit_(0), - retryDelay_(0), - tcpSendBuffer_(0), - tcpRecvBuffer_(0), - intSock1_(THRIFT_INVALID_SOCKET), - intSock2_(THRIFT_INVALID_SOCKET) {} - -TServerSocket::TServerSocket(string path) : - port_(0), - path_(path), - serverSocket_(THRIFT_INVALID_SOCKET), - acceptBacklog_(DEFAULT_BACKLOG), - sendTimeout_(0), - recvTimeout_(0), - accTimeout_(-1), - retryLimit_(0), - retryDelay_(0), - tcpSendBuffer_(0), - tcpRecvBuffer_(0), - intSock1_(THRIFT_INVALID_SOCKET), - intSock2_(THRIFT_INVALID_SOCKET) {} - -TServerSocket::~TServerSocket() { - close(); -} - -void TServerSocket::setSendTimeout(int sendTimeout) { - sendTimeout_ = sendTimeout; -} - -void TServerSocket::setRecvTimeout(int recvTimeout) { - recvTimeout_ = recvTimeout; -} - -void TServerSocket::setAcceptTimeout(int accTimeout) { - accTimeout_ = accTimeout; -} - -void TServerSocket::setAcceptBacklog(int accBacklog) { - acceptBacklog_ = accBacklog; -} - -void TServerSocket::setRetryLimit(int retryLimit) { - retryLimit_ = retryLimit; -} - -void TServerSocket::setRetryDelay(int retryDelay) { - retryDelay_ = retryDelay; -} - -void TServerSocket::setTcpSendBuffer(int tcpSendBuffer) { - tcpSendBuffer_ = tcpSendBuffer; -} - -void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) { - tcpRecvBuffer_ = tcpRecvBuffer; -} - -void TServerSocket::listen() { - THRIFT_SOCKET sv[2]; - if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) { - GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR); - intSock1_ = THRIFT_INVALID_SOCKET; - intSock2_ = THRIFT_INVALID_SOCKET; - } else { - intSock1_ = sv[1]; - intSock2_ = sv[0]; - } - - struct addrinfo hints, *res, *res0; - int error; - char port[sizeof("65536") + 1]; - std::memset(&hints, 0, sizeof(hints)); - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - sprintf(port, "%d", port_); - - // Wildcard address - error = getaddrinfo(NULL, port, &hints, &res0); - if (error) { - GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error)); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket."); - } - - // Pick the ipv6 address first since ipv4 addresses can be mapped - // into ipv6 space. - for (res = res0; res; res = res->ai_next) { - if (res->ai_family == AF_INET6 || res->ai_next == NULL) - break; - } - - if (! path_.empty()) { - serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); - } else { - serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - } - - if (serverSocket_ == THRIFT_INVALID_SOCKET) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy); - } - - // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept - int one = 1; - if (-1 == setsockopt(serverSocket_, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, - cast_sockopt(&one), sizeof(one))) { - //ignore errors coming out of this setsockopt on Windows. This is because - //SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't - //want to force servers to be an admin. -#ifndef _WIN32 - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set THRIFT_NO_SOCKET_CACHING", errno_copy); -#endif - } - - // Set TCP buffer sizes - if (tcpSendBuffer_ > 0) { - if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF, - cast_sockopt(&tcpSendBuffer_), sizeof(tcpSendBuffer_))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy); - } - } - - if (tcpRecvBuffer_ > 0) { - if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF, - cast_sockopt(&tcpRecvBuffer_), sizeof(tcpRecvBuffer_))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy); - } - } - - // Defer accept - #ifdef TCP_DEFER_ACCEPT - if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT, - &one, sizeof(one))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy); - } - #endif // #ifdef TCP_DEFER_ACCEPT - - #ifdef IPV6_V6ONLY - if (res->ai_family == AF_INET6 && path_.empty()) { - int zero = 0; - if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY, - cast_sockopt(&zero), sizeof(zero))) { - GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR); - } - } - #endif // #ifdef IPV6_V6ONLY - - // Turn linger off, don't want to block on calls to close - struct linger ling = {0, 0}; - if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, - cast_sockopt(&ling), sizeof(ling))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy); - } - - // Unix Sockets do not need that - if (path_.empty()) { - // TCP Nodelay, speed over bandwidth - if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, - cast_sockopt(&one), sizeof(one))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy); - } - } - - // Set NONBLOCK on the accept socket - int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0); - if (flags == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); - } - - if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); - } - - // prepare the port information - // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't - // always seem to work. The client can configure the retry variables. - int retries = 0; - - if (! path_.empty()) { - -#ifndef _WIN32 - - // Unix Domain Socket - struct sockaddr_un address; - socklen_t len; - - if (path_.length() > sizeof(address.sun_path)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long"); - } - - address.sun_family = AF_UNIX; - THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str()); - len = sizeof(address); - - do { - if (0 == ::bind(serverSocket_, (struct sockaddr *) &address, len)) { - break; - } - // use short circuit evaluation here to only sleep if we need to - } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0)); -#else - GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99); - throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported"); -#endif - } else { - do { - if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) { - break; - } - // use short circuit evaluation here to only sleep if we need to - } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0)); - - // free addrinfo - freeaddrinfo(res0); - } - - // throw an error if we failed to bind properly - if (retries > retryLimit_) { - char errbuf[1024]; - if (! path_.empty()) { - sprintf(errbuf, "TServerSocket::listen() PATH %s", path_.c_str()); - } - else { - sprintf(errbuf, "TServerSocket::listen() BIND %d", port_); - } - GlobalOutput(errbuf); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not bind", - THRIFT_GET_SOCKET_ERROR); - } - - // Call listen - if (-1 == ::listen(serverSocket_, acceptBacklog_)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy); - } - - // The socket is now listening! -} - -shared_ptr<TTransport> TServerSocket::acceptImpl() { - if (serverSocket_ == THRIFT_INVALID_SOCKET) { - throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening"); - } - - struct THRIFT_POLLFD fds[2]; - - int maxEintrs = 5; - int numEintrs = 0; - - while (true) { - std::memset(fds, 0 , sizeof(fds)); - fds[0].fd = serverSocket_; - fds[0].events = THRIFT_POLLIN; - if (intSock2_ != THRIFT_INVALID_SOCKET) { - fds[1].fd = intSock2_; - fds[1].events = THRIFT_POLLIN; - } - /* - TODO: if THRIFT_EINTR is received, we'll restart the timeout. - To be accurate, we need to fix this in the future. - */ - int ret = THRIFT_POLL(fds, 2, accTimeout_); - - if (ret < 0) { - // error cases - if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && (numEintrs++ < maxEintrs)) { - // THRIFT_EINTR needs to be handled manually and we can tolerate - // a certain number - continue; - } - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_POLL() ", errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy); - } else if (ret > 0) { - // Check for an interrupt signal - if (intSock2_ != THRIFT_INVALID_SOCKET - && (fds[1].revents & THRIFT_POLLIN)) { - int8_t buf; - if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) { - GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", THRIFT_GET_SOCKET_ERROR); - } - throw TTransportException(TTransportException::INTERRUPTED); - } - - // Check for the actual server socket being ready - if (fds[0].revents & THRIFT_POLLIN) { - break; - } - } else { - GlobalOutput("TServerSocket::acceptImpl() THRIFT_POLL 0"); - throw TTransportException(TTransportException::UNKNOWN); - } - } - - struct sockaddr_storage clientAddress; - int size = sizeof(clientAddress); - THRIFT_SOCKET clientSocket = ::accept(serverSocket_, - (struct sockaddr *) &clientAddress, - (socklen_t *) &size); - - if (clientSocket == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy); - } - - // Make sure client socket is blocking - int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0); - if (flags == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_GETFL)", errno_copy); - } - - if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ", errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_SETFL)", errno_copy); - } - - shared_ptr<TSocket> client = createSocket(clientSocket); - if (sendTimeout_ > 0) { - client->setSendTimeout(sendTimeout_); - } - if (recvTimeout_ > 0) { - client->setRecvTimeout(recvTimeout_); - } - client->setCachedAddress((sockaddr*) &clientAddress, size); - - return client; -} - -shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket) { - return shared_ptr<TSocket>(new TSocket(clientSocket)); -} - -void TServerSocket::interrupt() { - if (intSock1_ != THRIFT_INVALID_SOCKET) { - int8_t byte = 0; - if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) { - GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR); - } - } -} - -void TServerSocket::close() { - if (serverSocket_ != THRIFT_INVALID_SOCKET) { - shutdown(serverSocket_, THRIFT_SHUT_RDWR); - ::THRIFT_CLOSESOCKET(serverSocket_); - } - if (intSock1_ != THRIFT_INVALID_SOCKET) { - ::THRIFT_CLOSESOCKET(intSock1_); - } - if (intSock2_ != THRIFT_INVALID_SOCKET) { - ::THRIFT_CLOSESOCKET(intSock2_); - } - serverSocket_ = THRIFT_INVALID_SOCKET; - intSock1_ = THRIFT_INVALID_SOCKET; - intSock2_ = THRIFT_INVALID_SOCKET; -} - -}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.h deleted file mode 100644 index 4a8c029..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerSocket.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_ -#define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1 - -#include <thrift/transport/TServerTransport.h> -#include <thrift/transport/PlatformSocket.h> -#include <boost/shared_ptr.hpp> - -namespace apache { namespace thrift { namespace transport { - -class TSocket; - -/** - * Server socket implementation of TServerTransport. Wrapper around a unix - * socket listen and accept calls. - * - */ -class TServerSocket : public TServerTransport { - public: - const static int DEFAULT_BACKLOG = 1024; - - TServerSocket(int port); - TServerSocket(int port, int sendTimeout, int recvTimeout); - TServerSocket(std::string path); - - ~TServerSocket(); - - void setSendTimeout(int sendTimeout); - void setRecvTimeout(int recvTimeout); - - void setAcceptTimeout(int accTimeout); - void setAcceptBacklog(int accBacklog); - - void setRetryLimit(int retryLimit); - void setRetryDelay(int retryDelay); - - void setTcpSendBuffer(int tcpSendBuffer); - void setTcpRecvBuffer(int tcpRecvBuffer); - - void listen(); - void close(); - - void interrupt(); - - protected: - boost::shared_ptr<TTransport> acceptImpl(); - virtual boost::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client); - - private: - int port_; - std::string path_; - THRIFT_SOCKET serverSocket_; - int acceptBacklog_; - int sendTimeout_; - int recvTimeout_; - int accTimeout_; - int retryLimit_; - int retryDelay_; - int tcpSendBuffer_; - int tcpRecvBuffer_; - - THRIFT_SOCKET intSock1_; - THRIFT_SOCKET intSock2_; -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerTransport.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerTransport.h deleted file mode 100644 index 2ddee0d..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TServerTransport.h +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ 1 - -#include <thrift/transport/TTransport.h> -#include <thrift/transport/TTransportException.h> -#include <boost/shared_ptr.hpp> - -namespace apache { namespace thrift { namespace transport { - -/** - * Server transport framework. A server needs to have some facility for - * creating base transports to read/write from. - * - */ -class TServerTransport { - public: - virtual ~TServerTransport() {} - - /** - * Starts the server transport listening for new connections. Prior to this - * call most transports will not return anything when accept is called. - * - * @throws TTransportException if we were unable to listen - */ - virtual void listen() {} - - /** - * Gets a new dynamically allocated transport object and passes it to the - * caller. Note that it is the explicit duty of the caller to free the - * allocated object. The returned TTransport object must always be in the - * opened state. NULL should never be returned, instead an Exception should - * always be thrown. - * - * @return A new TTransport object - * @throws TTransportException if there is an error - */ - boost::shared_ptr<TTransport> accept() { - boost::shared_ptr<TTransport> result = acceptImpl(); - if (!result) { - throw TTransportException("accept() may not return NULL"); - } - return result; - } - - /** - * For "smart" TServerTransport implementations that work in a multi - * threaded context this can be used to break out of an accept() call. - * It is expected that the transport will throw a TTransportException - * with the interrupted error code. - */ - virtual void interrupt() {} - - /** - * Closes this transport such that future calls to accept will do nothing. - */ - virtual void close() = 0; - - protected: - TServerTransport() {} - - /** - * Subclasses should implement this function for accept. - * - * @return A newly allocated TTransport object - * @throw TTransportException If an error occurs - */ - virtual boost::shared_ptr<TTransport> acceptImpl() = 0; - -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TShortReadTransport.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TShortReadTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TShortReadTransport.h deleted file mode 100644 index 8def354..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TShortReadTransport.h +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ 1 - -#include <cstdlib> - -#include <thrift/transport/TTransport.h> -#include <thrift/transport/TVirtualTransport.h> - -namespace apache { namespace thrift { namespace transport { namespace test { - -/** - * This class is only meant for testing. It wraps another transport. - * Calls to read are passed through with some probability. Otherwise, - * the read amount is randomly reduced before being passed through. - * - */ -class TShortReadTransport : public TVirtualTransport<TShortReadTransport> { - public: - TShortReadTransport(boost::shared_ptr<TTransport> transport, double full_prob) - : transport_(transport) - , fullProb_(full_prob) - {} - - bool isOpen() { - return transport_->isOpen(); - } - - bool peek() { - return transport_->peek(); - } - - void open() { - transport_->open(); - } - - void close() { - transport_->close(); - } - - uint32_t read(uint8_t* buf, uint32_t len) { - if (len == 0) { - return 0; - } - - if (rand()/(double)RAND_MAX >= fullProb_) { - len = 1 + rand()%len; - } - return transport_->read(buf, len); - } - - void write(const uint8_t* buf, uint32_t len) { - transport_->write(buf, len); - } - - void flush() { - transport_->flush(); - } - - const uint8_t* borrow(uint8_t* buf, uint32_t* len) { - return transport_->borrow(buf, len); - } - - void consume(uint32_t len) { - return transport_->consume(len); - } - - boost::shared_ptr<TTransport> getUnderlyingTransport() { - return transport_; - } - - protected: - boost::shared_ptr<TTransport> transport_; - double fullProb_; -}; - -}}}} // apache::thrift::transport::test - -#endif // #ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.cpp deleted file mode 100644 index 9af1445..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#include <thrift/transport/TSimpleFileTransport.h> - -#include <sys/types.h> -#ifdef HAVE_SYS_STAT_H -#include <sys/stat.h> -#endif -#include <fcntl.h> - -#ifdef _WIN32 -#include <io.h> -#endif - -namespace apache { namespace thrift { namespace transport { - -TSimpleFileTransport:: -TSimpleFileTransport(const std::string& path, bool read, bool write) - : TFDTransport(-1, TFDTransport::CLOSE_ON_DESTROY) { - int flags = 0; - if (read && write) { - flags = O_RDWR; - } else if (read) { - flags = O_RDONLY; - } else if (write) { - flags = O_WRONLY; - } else { - throw TTransportException("Neither READ nor WRITE specified"); - } - if (write) { - flags |= O_CREAT | O_APPEND; - } -#ifndef _WIN32 - mode_t mode = S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH; -#else - int mode = _S_IREAD | _S_IWRITE; -#endif - int fd = ::open(path.c_str(), - flags, - mode); - if (fd < 0) { - throw TTransportException("failed to open file for writing: " + path); - } - setFD(fd); - open(); -} - -}}} // apache::thrift::transport http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.h deleted file mode 100644 index 985a1d3..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSimpleFileTransport.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TSIMPLEFILETRANSPORT_H_ -#define _THRIFT_TRANSPORT_TSIMPLEFILETRANSPORT_H_ 1 - -#include <thrift/transport/TFDTransport.h> - -namespace apache { namespace thrift { namespace transport { - -/** - * Dead-simple wrapper around a file. - * - * Writeable files are opened with O_CREAT and O_APPEND - */ -class TSimpleFileTransport : public TFDTransport { - public: - TSimpleFileTransport(const std::string& path, - bool read = true, - bool write = false); -}; - -}}} // apache::thrift::transport - -#endif // _THRIFT_TRANSPORT_TSIMPLEFILETRANSPORT_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.cpp deleted file mode 100644 index d521bb5..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.cpp +++ /dev/null @@ -1,813 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#include <cstring> -#include <sstream> -#ifdef HAVE_SYS_SOCKET_H -#include <sys/socket.h> -#endif -#ifdef HAVE_SYS_UN_H -#include <sys/un.h> -#endif -#ifdef HAVE_SYS_POLL_H -#include <sys/poll.h> -#endif -#include <sys/types.h> -#ifdef HAVE_NETINET_IN_H -#include <netinet/in.h> -#include <netinet/tcp.h> -#endif -#ifdef HAVE_UNISTD_H -#include <unistd.h> -#endif -#include <fcntl.h> - -#include <thrift/concurrency/Monitor.h> -#include <thrift/transport/TSocket.h> -#include <thrift/transport/TTransportException.h> -#include <thrift/transport/PlatformSocket.h> - -#ifndef SOCKOPT_CAST_T -# ifndef _WIN32 -# define SOCKOPT_CAST_T void -# else -# define SOCKOPT_CAST_T char -# endif // _WIN32 -#endif - -template<class T> -inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) { - return reinterpret_cast<const SOCKOPT_CAST_T*>(v); -} - -template<class T> -inline SOCKOPT_CAST_T* cast_sockopt(T* v) { - return reinterpret_cast<SOCKOPT_CAST_T*>(v); -} - -namespace apache { namespace thrift { namespace transport { - -using namespace std; - -// Global var to track total socket sys calls -uint32_t g_socket_syscalls = 0; - -/** - * TSocket implementation. - * - */ - -TSocket::TSocket(string host, int port) : - host_(host), - port_(port), - path_(""), - socket_(THRIFT_INVALID_SOCKET), - connTimeout_(0), - sendTimeout_(0), - recvTimeout_(0), - lingerOn_(1), - lingerVal_(0), - noDelay_(1), - maxRecvRetries_(5) { - recvTimeval_.tv_sec = (int)(recvTimeout_/1000); - recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); -} - -TSocket::TSocket(string path) : - host_(""), - port_(0), - path_(path), - socket_(THRIFT_INVALID_SOCKET), - connTimeout_(0), - sendTimeout_(0), - recvTimeout_(0), - lingerOn_(1), - lingerVal_(0), - noDelay_(1), - maxRecvRetries_(5) { - recvTimeval_.tv_sec = (int)(recvTimeout_/1000); - recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); - cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; -} - -TSocket::TSocket() : - host_(""), - port_(0), - path_(""), - socket_(THRIFT_INVALID_SOCKET), - connTimeout_(0), - sendTimeout_(0), - recvTimeout_(0), - lingerOn_(1), - lingerVal_(0), - noDelay_(1), - maxRecvRetries_(5) { - recvTimeval_.tv_sec = (int)(recvTimeout_/1000); - recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); - cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; -} - -TSocket::TSocket(THRIFT_SOCKET socket) : - host_(""), - port_(0), - path_(""), - socket_(socket), - connTimeout_(0), - sendTimeout_(0), - recvTimeout_(0), - lingerOn_(1), - lingerVal_(0), - noDelay_(1), - maxRecvRetries_(5) { - recvTimeval_.tv_sec = (int)(recvTimeout_/1000); - recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); - cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; -} - -TSocket::~TSocket() { - close(); -} - -bool TSocket::isOpen() { - return (socket_ != THRIFT_INVALID_SOCKET); -} - -bool TSocket::peek() { - if (!isOpen()) { - return false; - } - uint8_t buf; - int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK)); - if (r == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - #if defined __FreeBSD__ || defined __MACH__ - /* shigin: - * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by - * the other side - */ - if (errno_copy == THRIFT_ECONNRESET) - { - close(); - return false; - } - #endif - GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy); - } - return (r > 0); -} - -void TSocket::openConnection(struct addrinfo *res) { - - if (isOpen()) { - return; - } - - if (! path_.empty()) { - socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); - } else { - socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - } - - if (socket_ == THRIFT_INVALID_SOCKET) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy); - } - - // Send timeout - if (sendTimeout_ > 0) { - setSendTimeout(sendTimeout_); - } - - // Recv timeout - if (recvTimeout_ > 0) { - setRecvTimeout(recvTimeout_); - } - - // Linger - setLinger(lingerOn_, lingerVal_); - - // No delay - setNoDelay(noDelay_); - - // Uses a low min RTO if asked to. -#ifdef TCP_LOW_MIN_RTO - if (getUseLowMinRto()) { - int one = 1; - setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one)); - } -#endif - - - // Set the socket to be non blocking for connect if a timeout exists - int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0); - if (connTimeout_ > 0) { - if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); - } - } else { - if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); - } - } - - // Connect the socket - int ret; - if (! path_.empty()) { - -#ifndef _WIN32 - - struct sockaddr_un address; - socklen_t len; - - if (path_.length() > sizeof(address.sun_path)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long"); - } - - address.sun_family = AF_UNIX; - THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str()); - len = sizeof(address); - ret = connect(socket_, (struct sockaddr *) &address, len); - -#else - GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99); - throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported"); -#endif - - } else { - ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen)); - } - - // success case - if (ret == 0) { - goto done; - } - - if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS) && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy); - } - - - struct THRIFT_POLLFD fds[1]; - std::memset(fds, 0 , sizeof(fds)); - fds[0].fd = socket_; - fds[0].events = THRIFT_POLLOUT; - ret = THRIFT_POLL(fds, 1, connTimeout_); - - if (ret > 0) { - // Ensure the socket is connected and that there are no errors set - int val; - socklen_t lon; - lon = sizeof(int); - int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon); - if (ret2 == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy); - } - // no errors on socket, go to town - if (val == 0) { - goto done; - } - GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(), val); - throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val); - } else if (ret == 0) { - // socket timed out - string errStr = "TSocket::open() timed out " + getSocketInfo(); - GlobalOutput(errStr.c_str()); - throw TTransportException(TTransportException::NOT_OPEN, "open() timed out"); - } else { - // error on THRIFT_POLL() - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy); - } - - done: - // Set socket back to normal mode (blocking) - THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags); - - if (path_.empty()) { - setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen)); - } -} - -void TSocket::open() { - if (isOpen()) { - return; - } - if (! path_.empty()) { - unix_open(); - } else { - local_open(); - } -} - -void TSocket::unix_open(){ - if (! path_.empty()) { - // Unix Domain SOcket does not need addrinfo struct, so we pass NULL - openConnection(NULL); - } -} - -void TSocket::local_open(){ - -#ifdef _WIN32 - TWinsockSingleton::create(); -#endif // _WIN32 - - if (isOpen()) { - return; - } - - // Validate port number - if (port_ < 0 || port_ > 0xFFFF) { - throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid"); - } - - struct addrinfo hints, *res, *res0; - res = NULL; - res0 = NULL; - int error; - char port[sizeof("65535")]; - std::memset(&hints, 0, sizeof(hints)); - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - sprintf(port, "%d", port_); - - error = getaddrinfo(host_.c_str(), port, &hints, &res0); - - if (error) { - string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(THRIFT_GAI_STRERROR(error)); - GlobalOutput(errStr.c_str()); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket."); - } - - // Cycle through all the returned addresses until one - // connects or push the exception up. - for (res = res0; res; res = res->ai_next) { - try { - openConnection(res); - break; - } catch (TTransportException&) { - if (res->ai_next) { - close(); - } else { - close(); - freeaddrinfo(res0); // cleanup on failure - throw; - } - } - } - - // Free address structure memory - freeaddrinfo(res0); -} - -void TSocket::close() { - if (socket_ != THRIFT_INVALID_SOCKET) { - shutdown(socket_, THRIFT_SHUT_RDWR); - ::THRIFT_CLOSESOCKET(socket_); - } - socket_ = THRIFT_INVALID_SOCKET; -} - -void TSocket::setSocketFD(THRIFT_SOCKET socket) { - if (socket_ != THRIFT_INVALID_SOCKET) { - close(); - } - socket_ = socket; -} - -uint32_t TSocket::read(uint8_t* buf, uint32_t len) { - if (socket_ == THRIFT_INVALID_SOCKET) { - throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket"); - } - - int32_t retries = 0; - - // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when - // the system is out of resources (an awesome undocumented feature). - // The following is an approximation of the time interval under which - // THRIFT_EAGAIN is taken to indicate an out of resources error. - uint32_t eagainThresholdMicros = 0; - if (recvTimeout_) { - // if a readTimeout is specified along with a max number of recv retries, then - // the threshold will ensure that the read timeout is not exceeded even in the - // case of resource errors - eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2); - } - - try_again: - // Read from the socket - struct timeval begin; - if (recvTimeout_ > 0) { - THRIFT_GETTIMEOFDAY(&begin, NULL); - } else { - // if there is no read timeout we don't need the TOD to determine whether - // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition. - begin.tv_sec = begin.tv_usec = 0; - } - int got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0)); - int errno_copy = THRIFT_GET_SOCKET_ERROR; //THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR - ++g_socket_syscalls; - - // Check for error on read - if (got < 0) { - if (errno_copy == THRIFT_EAGAIN) { - // if no timeout we can assume that resource exhaustion has occurred. - if (recvTimeout_ == 0) { - throw TTransportException(TTransportException::TIMED_OUT, - "THRIFT_EAGAIN (unavailable resources)"); - } - // check if this is the lack of resources or timeout case - struct timeval end; - THRIFT_GETTIMEOFDAY(&end, NULL); - uint32_t readElapsedMicros = static_cast<uint32_t>( - ((end.tv_sec - begin.tv_sec) * 1000 * 1000) - + (((uint64_t)(end.tv_usec - begin.tv_usec)))); - - if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) { - if (retries++ < maxRecvRetries_) { - THRIFT_SLEEP_USEC(50); - goto try_again; - } else { - throw TTransportException(TTransportException::TIMED_OUT, - "THRIFT_EAGAIN (unavailable resources)"); - } - } else { - // infer that timeout has been hit - throw TTransportException(TTransportException::TIMED_OUT, - "THRIFT_EAGAIN (timed out)"); - } - } - - // If interrupted, try again - if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_) { - goto try_again; - } - - #if defined __FreeBSD__ || defined __MACH__ - if (errno_copy == THRIFT_ECONNRESET) { - /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with - * THRIFT_ECONNRESET if peer performed shutdown - * edhall: eliminated close() since we do that in the destructor. - */ - return 0; - } - #endif - -#ifdef _WIN32 - if(errno_copy == WSAECONNRESET) { - return 0; // EOF - } -#endif - - // Now it's not a try again case, but a real probblez - GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy); - - // If we disconnect with no linger time - if (errno_copy == THRIFT_ECONNRESET) { - throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET"); - } - - // This ish isn't open - if (errno_copy == THRIFT_ENOTCONN) { - throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN"); - } - - // Timed out! - if (errno_copy == THRIFT_ETIMEDOUT) { - throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT"); - } - - // Some other error, whatevz - throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy); - } - - // The remote host has closed the socket - if (got == 0) { - // edhall: we used to call close() here, but our caller may want to deal - // with the socket fd and we'll close() in our destructor in any case. - return 0; - } - - // Pack data into string - return got; -} - -void TSocket::write(const uint8_t* buf, uint32_t len) { - uint32_t sent = 0; - - while (sent < len) { - uint32_t b = write_partial(buf + sent, len - sent); - if (b == 0) { - // This should only happen if the timeout set with SO_SNDTIMEO expired. - // Raise an exception. - throw TTransportException(TTransportException::TIMED_OUT, - "send timeout expired"); - } - sent += b; - } -} - -uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) { - if (socket_ == THRIFT_INVALID_SOCKET) { - throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket"); - } - - uint32_t sent = 0; - - int flags = 0; -#ifdef MSG_NOSIGNAL - // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we - // check for the THRIFT_EPIPE return condition and close the socket in that case - flags |= MSG_NOSIGNAL; -#endif // ifdef MSG_NOSIGNAL - - int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags)); - ++g_socket_syscalls; - - if (b < 0) { - if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN) { - return 0; - } - // Fail on a send error - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy); - - if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET || errno_copy == THRIFT_ENOTCONN) { - close(); - throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy); - } - - throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy); - } - - // Fail on blocked send - if (b == 0) { - throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0."); - } - return b; -} - -std::string TSocket::getHost() { - return host_; -} - -int TSocket::getPort() { - return port_; -} - -void TSocket::setHost(string host) { - host_ = host; -} - -void TSocket::setPort(int port) { - port_ = port; -} - -void TSocket::setLinger(bool on, int linger) { - lingerOn_ = on; - lingerVal_ = linger; - if (socket_ == THRIFT_INVALID_SOCKET) { - return; - } - - struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_}; - int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l)); - if (ret == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. - GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy); - } -} - -void TSocket::setNoDelay(bool noDelay) { - noDelay_ = noDelay; - if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty()) { - return; - } - - // Set socket to NODELAY - int v = noDelay_ ? 1 : 0; - int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v)); - if (ret == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. - GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy); - } -} - -void TSocket::setConnTimeout(int ms) { - connTimeout_ = ms; -} - -void TSocket::setRecvTimeout(int ms) { - if (ms < 0) { - char errBuf[512]; - sprintf(errBuf, "TSocket::setRecvTimeout with negative input: %d\n", ms); - GlobalOutput(errBuf); - return; - } - recvTimeout_ = ms; - - if (socket_ == THRIFT_INVALID_SOCKET) { - return; - } - - recvTimeval_.tv_sec = (int)(recvTimeout_/1000); - recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); - - // Copy because THRIFT_POLL may modify - struct timeval r = recvTimeval_; - int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, cast_sockopt(&r), sizeof(r)); - if (ret == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. - GlobalOutput.perror("TSocket::setRecvTimeout() setsockopt() " + getSocketInfo(), errno_copy); - } -} - -void TSocket::setSendTimeout(int ms) { - if (ms < 0) { - char errBuf[512]; - sprintf(errBuf, "TSocket::setSendTimeout with negative input: %d\n", ms); - GlobalOutput(errBuf); - return; - } - sendTimeout_ = ms; - - if (socket_ == THRIFT_INVALID_SOCKET) { - return; - } - - struct timeval s = {(int)(sendTimeout_/1000), - (int)((sendTimeout_%1000)*1000)}; - int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, cast_sockopt(&s), sizeof(s)); - if (ret == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. - GlobalOutput.perror("TSocket::setSendTimeout() setsockopt() " + getSocketInfo(), errno_copy); - } -} - -void TSocket::setMaxRecvRetries(int maxRecvRetries) { - maxRecvRetries_ = maxRecvRetries; -} - -string TSocket::getSocketInfo() { - std::ostringstream oss; - if (host_.empty() || port_ == 0) { - oss << "<Host: " << getPeerAddress(); - oss << " Port: " << getPeerPort() << ">"; - } else { - oss << "<Host: " << host_ << " Port: " << port_ << ">"; - } - return oss.str(); -} - -std::string TSocket::getPeerHost() { - if (peerHost_.empty() && path_.empty()) { - struct sockaddr_storage addr; - struct sockaddr* addrPtr; - socklen_t addrLen; - - if (socket_ == THRIFT_INVALID_SOCKET) { - return host_; - } - - addrPtr = getCachedAddress(&addrLen); - - if (addrPtr == NULL) { - addrLen = sizeof(addr); - if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) { - return peerHost_; - } - addrPtr = (sockaddr*)&addr; - - setCachedAddress(addrPtr, addrLen); - } - - char clienthost[NI_MAXHOST]; - char clientservice[NI_MAXSERV]; - - getnameinfo((sockaddr*) addrPtr, addrLen, - clienthost, sizeof(clienthost), - clientservice, sizeof(clientservice), 0); - - peerHost_ = clienthost; - } - return peerHost_; -} - -std::string TSocket::getPeerAddress() { - if (peerAddress_.empty() && path_.empty()) { - struct sockaddr_storage addr; - struct sockaddr* addrPtr; - socklen_t addrLen; - - if (socket_ == THRIFT_INVALID_SOCKET) { - return peerAddress_; - } - - addrPtr = getCachedAddress(&addrLen); - - if (addrPtr == NULL) { - addrLen = sizeof(addr); - if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) { - return peerAddress_; - } - addrPtr = (sockaddr*)&addr; - - setCachedAddress(addrPtr, addrLen); - } - - char clienthost[NI_MAXHOST]; - char clientservice[NI_MAXSERV]; - - getnameinfo(addrPtr, addrLen, - clienthost, sizeof(clienthost), - clientservice, sizeof(clientservice), - NI_NUMERICHOST|NI_NUMERICSERV); - - peerAddress_ = clienthost; - peerPort_ = std::atoi(clientservice); - } - return peerAddress_; -} - -int TSocket::getPeerPort() { - getPeerAddress(); - return peerPort_; -} - -void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) { - if (!path_.empty()) { - return; - } - - switch (addr->sa_family) { - case AF_INET: - if (len == sizeof(sockaddr_in)) { - memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len); - } - break; - - case AF_INET6: - if (len == sizeof(sockaddr_in6)) { - memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len); - } - break; - } -} - -sockaddr* TSocket::getCachedAddress(socklen_t* len) const { - switch (cachedPeerAddr_.ipv4.sin_family) { - case AF_INET: - *len = sizeof(sockaddr_in); - return (sockaddr*) &cachedPeerAddr_.ipv4; - - case AF_INET6: - *len = sizeof(sockaddr_in6); - return (sockaddr*) &cachedPeerAddr_.ipv6; - - default: - return NULL; - } -} - -bool TSocket::useLowMinRto_ = false; -void TSocket::setUseLowMinRto(bool useLowMinRto) { - useLowMinRto_ = useLowMinRto; -} -bool TSocket::getUseLowMinRto() { - return useLowMinRto_; -} - -}}} // apache::thrift::transport http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.h deleted file mode 100644 index fd5b961..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocket.h +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TSOCKET_H_ -#define _THRIFT_TRANSPORT_TSOCKET_H_ 1 - -#include <string> - -#include <thrift/transport/TTransport.h> -#include <thrift/transport/TVirtualTransport.h> -#include <thrift/transport/TServerSocket.h> -#include <thrift/transport/PlatformSocket.h> - -#ifdef HAVE_ARPA_INET_H -#include <arpa/inet.h> -#endif -#ifdef HAVE_SYS_TIME_H -#include <sys/time.h> -#endif -#ifdef HAVE_NETDB_H -#include <netdb.h> -#endif - -namespace apache { namespace thrift { namespace transport { - -/** - * TCP Socket implementation of the TTransport interface. - * - */ -class TSocket : public TVirtualTransport<TSocket> { - public: - /** - * Constructs a new socket. Note that this does NOT actually connect the - * socket. - * - */ - TSocket(); - - /** - * Constructs a new socket. Note that this does NOT actually connect the - * socket. - * - * @param host An IP address or hostname to connect to - * @param port The port to connect on - */ - TSocket(std::string host, int port); - - /** - * Constructs a new Unix domain socket. - * Note that this does NOT actually connect the socket. - * - * @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift" - */ - TSocket(std::string path); - - /** - * Destroyes the socket object, closing it if necessary. - */ - virtual ~TSocket(); - - /** - * Whether the socket is alive. - * - * @return Is the socket alive? - */ - virtual bool isOpen(); - - /** - * Calls select on the socket to see if there is more data available. - */ - virtual bool peek(); - - /** - * Creates and opens the UNIX socket. - * - * @throws TTransportException If the socket could not connect - */ - virtual void open(); - - /** - * Shuts down communications on the socket. - */ - virtual void close(); - - /** - * Reads from the underlying socket. - */ - virtual uint32_t read(uint8_t* buf, uint32_t len); - - /** - * Writes to the underlying socket. Loops until done or fail. - */ - virtual void write(const uint8_t* buf, uint32_t len); - - /** - * Writes to the underlying socket. Does single send() and returns result. - */ - uint32_t write_partial(const uint8_t* buf, uint32_t len); - - /** - * Get the host that the socket is connected to - * - * @return string host identifier - */ - std::string getHost(); - - /** - * Get the port that the socket is connected to - * - * @return int port number - */ - int getPort(); - - /** - * Set the host that socket will connect to - * - * @param host host identifier - */ - void setHost(std::string host); - - /** - * Set the port that socket will connect to - * - * @param port port number - */ - void setPort(int port); - - /** - * Controls whether the linger option is set on the socket. - * - * @param on Whether SO_LINGER is on - * @param linger If linger is active, the number of seconds to linger for - */ - void setLinger(bool on, int linger); - - /** - * Whether to enable/disable Nagle's algorithm. - * - * @param noDelay Whether or not to disable the algorithm. - * @return - */ - void setNoDelay(bool noDelay); - - /** - * Set the connect timeout - */ - void setConnTimeout(int ms); - - /** - * Set the receive timeout - */ - void setRecvTimeout(int ms); - - /** - * Set the send timeout - */ - void setSendTimeout(int ms); - - /** - * Set the max number of recv retries in case of an THRIFT_EAGAIN - * error - */ - void setMaxRecvRetries(int maxRecvRetries); - - /** - * Get socket information formated as a string <Host: x Port: x> - */ - std::string getSocketInfo(); - - /** - * Returns the DNS name of the host to which the socket is connected - */ - std::string getPeerHost(); - - /** - * Returns the address of the host to which the socket is connected - */ - std::string getPeerAddress(); - - /** - * Returns the port of the host to which the socket is connected - **/ - int getPeerPort(); - - /** - * Returns the underlying socket file descriptor. - */ - THRIFT_SOCKET getSocketFD() { - return socket_; - } - - /** - * (Re-)initialize a TSocket for the supplied descriptor. This is only - * intended for use by TNonblockingServer -- other use may result in - * unfortunate surprises. - * - * @param fd the descriptor for an already-connected socket - */ - void setSocketFD(THRIFT_SOCKET fd); - - /* - * Returns a cached copy of the peer address. - */ - sockaddr* getCachedAddress(socklen_t* len) const; - - /** - * Sets whether to use a low minimum TCP retransmission timeout. - */ - static void setUseLowMinRto(bool useLowMinRto); - - /** - * Gets whether to use a low minimum TCP retransmission timeout. - */ - static bool getUseLowMinRto(); - - /** - * Constructor to create socket from raw UNIX handle. - */ - TSocket(THRIFT_SOCKET socket); - - /** - * Set a cache of the peer address (used when trivially available: e.g. - * accept() or connect()). Only caches IPV4 and IPV6; unset for others. - */ - void setCachedAddress(const sockaddr* addr, socklen_t len); - - protected: - /** connect, called by open */ - void openConnection(struct addrinfo *res); - - /** Host to connect to */ - std::string host_; - - /** Peer hostname */ - std::string peerHost_; - - /** Peer address */ - std::string peerAddress_; - - /** Peer port */ - int peerPort_; - - /** Port number to connect on */ - int port_; - - /** UNIX domain socket path */ - std::string path_; - - /** Underlying UNIX socket handle */ - THRIFT_SOCKET socket_; - - /** Connect timeout in ms */ - int connTimeout_; - - /** Send timeout in ms */ - int sendTimeout_; - - /** Recv timeout in ms */ - int recvTimeout_; - - /** Linger on */ - bool lingerOn_; - - /** Linger val */ - int lingerVal_; - - /** Nodelay */ - bool noDelay_; - - /** Recv EGAIN retries */ - int maxRecvRetries_; - - /** Recv timeout timeval */ - struct timeval recvTimeval_; - - /** Cached peer address */ - union { - sockaddr_in ipv4; - sockaddr_in6 ipv6; - } cachedPeerAddr_; - - /** Whether to use low minimum TCP retransmission timeout */ - static bool useLowMinRto_; - - private: - void unix_open(); - void local_open(); -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TSOCKET_H_ - http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.cpp deleted file mode 100644 index e0b286a..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.cpp +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#include <algorithm> -#include <iostream> - -#include <thrift/transport/TSocketPool.h> - -namespace apache { namespace thrift { namespace transport { - -using namespace std; - -using boost::shared_ptr; - -/** - * TSocketPoolServer implementation - * - */ -TSocketPoolServer::TSocketPoolServer() - : host_(""), - port_(0), - socket_(THRIFT_INVALID_SOCKET), - lastFailTime_(0), - consecutiveFailures_(0) {} - -/** - * Constructor for TSocketPool server - */ -TSocketPoolServer::TSocketPoolServer(const string &host, int port) - : host_(host), - port_(port), - socket_(THRIFT_INVALID_SOCKET), - lastFailTime_(0), - consecutiveFailures_(0) {} - -/** - * TSocketPool implementation. - * - */ - -TSocketPool::TSocketPool() : TSocket(), - numRetries_(1), - retryInterval_(60), - maxConsecutiveFailures_(1), - randomize_(true), - alwaysTryLast_(true) { -} - -TSocketPool::TSocketPool(const vector<string> &hosts, - const vector<int> &ports) : TSocket(), - numRetries_(1), - retryInterval_(60), - maxConsecutiveFailures_(1), - randomize_(true), - alwaysTryLast_(true) -{ - if (hosts.size() != ports.size()) { - GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size"); - throw TTransportException(TTransportException::BAD_ARGS); - } - - for (unsigned int i = 0; i < hosts.size(); ++i) { - addServer(hosts[i], ports[i]); - } -} - -TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(), - numRetries_(1), - retryInterval_(60), - maxConsecutiveFailures_(1), - randomize_(true), - alwaysTryLast_(true) -{ - for (unsigned i = 0; i < servers.size(); ++i) { - addServer(servers[i].first, servers[i].second); - } -} - -TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(), - servers_(servers), - numRetries_(1), - retryInterval_(60), - maxConsecutiveFailures_(1), - randomize_(true), - alwaysTryLast_(true) -{ -} - -TSocketPool::TSocketPool(const string& host, int port) : TSocket(), - numRetries_(1), - retryInterval_(60), - maxConsecutiveFailures_(1), - randomize_(true), - alwaysTryLast_(true) -{ - addServer(host, port); -} - -TSocketPool::~TSocketPool() { - vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin(); - vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end(); - for (; iter != iterEnd; ++iter) { - setCurrentServer(*iter); - TSocketPool::close(); - } -} - -void TSocketPool::addServer(const string& host, int port) { - servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port))); -} - -void TSocketPool::addServer(shared_ptr<TSocketPoolServer> &server) { - if (server) { - servers_.push_back(server); - } -} - -void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) { - servers_ = servers; -} - -void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) { - servers = servers_; -} - -void TSocketPool::setNumRetries(int numRetries) { - numRetries_ = numRetries; -} - -void TSocketPool::setRetryInterval(int retryInterval) { - retryInterval_ = retryInterval; -} - - -void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) { - maxConsecutiveFailures_ = maxConsecutiveFailures; -} - -void TSocketPool::setRandomize(bool randomize) { - randomize_ = randomize; -} - -void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) { - alwaysTryLast_ = alwaysTryLast; -} - -void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) { - currentServer_ = server; - host_ = server->host_; - port_ = server->port_; - socket_ = server->socket_; -} - -/** - * This function throws an exception if socket open fails. When socket - * opens fails, the socket in the current server is reset. - */ -/* TODO: without apc we ignore a lot of functionality from the php version */ -void TSocketPool::open() { - - size_t numServers = servers_.size(); - if (numServers == 0) { - socket_ = THRIFT_INVALID_SOCKET; - throw TTransportException(TTransportException::NOT_OPEN); - } - - if (isOpen()) { - return; - } - - if (randomize_ && numServers > 1) { - random_shuffle(servers_.begin(), servers_.end()); - } - - for (size_t i = 0; i < numServers; ++i) { - - shared_ptr<TSocketPoolServer> &server = servers_[i]; - // Impersonate the server socket - setCurrentServer(server); - - if (isOpen()) { - // already open means we're done - return; - } - - bool retryIntervalPassed = (server->lastFailTime_ == 0); - bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false; - - if (server->lastFailTime_ > 0) { - // The server was marked as down, so check if enough time has elapsed to retry - time_t elapsedTime = time(NULL) - server->lastFailTime_; - if (elapsedTime > retryInterval_) { - retryIntervalPassed = true; - } - } - - if (retryIntervalPassed || isLastServer) { - for (int j = 0; j < numRetries_; ++j) { - try { - TSocket::open(); - } catch (TException e) { - string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what(); - GlobalOutput(errStr.c_str()); - socket_ = THRIFT_INVALID_SOCKET; - continue; - } - - // Copy over the opened socket so that we can keep it persistent - server->socket_ = socket_; - // reset lastFailTime_ is required - server->lastFailTime_ = 0; - // success - return; - } - - ++server->consecutiveFailures_; - if (server->consecutiveFailures_ > maxConsecutiveFailures_) { - // Mark server as down - server->consecutiveFailures_ = 0; - server->lastFailTime_ = time(NULL); - } - } - } - - GlobalOutput("TSocketPool::open: all connections failed"); - throw TTransportException(TTransportException::NOT_OPEN); -} - -void TSocketPool::close() { - TSocket::close(); - if (currentServer_) { - currentServer_->socket_ = THRIFT_INVALID_SOCKET; - } -} - -}}} // apache::thrift::transport http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.h deleted file mode 100644 index 7728257..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TSocketPool.h +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TSOCKETPOOL_H_ -#define _THRIFT_TRANSPORT_TSOCKETPOOL_H_ 1 - -#include <vector> -#include <thrift/transport/TSocket.h> - -namespace apache { namespace thrift { namespace transport { - - /** - * Class to hold server information for TSocketPool - * - */ -class TSocketPoolServer { - - public: - /** - * Default constructor for server info - */ - TSocketPoolServer(); - - /** - * Constructor for TSocketPool server - */ - TSocketPoolServer(const std::string &host, int port); - - // Host name - std::string host_; - - // Port to connect on - int port_; - - // Socket for the server - THRIFT_SOCKET socket_; - - // Last time connecting to this server failed - time_t lastFailTime_; - - // Number of consecutive times connecting to this server failed - int consecutiveFailures_; -}; - -/** - * TCP Socket implementation of the TTransport interface. - * - */ -class TSocketPool : public TSocket { - - public: - - /** - * Socket pool constructor - */ - TSocketPool(); - - /** - * Socket pool constructor - * - * @param hosts list of host names - * @param ports list of port names - */ - TSocketPool(const std::vector<std::string> &hosts, - const std::vector<int> &ports); - - /** - * Socket pool constructor - * - * @param servers list of pairs of host name and port - */ - TSocketPool(const std::vector<std::pair<std::string, int> >& servers); - - /** - * Socket pool constructor - * - * @param servers list of TSocketPoolServers - */ - TSocketPool(const std::vector< boost::shared_ptr<TSocketPoolServer> >& servers); - - /** - * Socket pool constructor - * - * @param host single host - * @param port single port - */ - TSocketPool(const std::string& host, int port); - - /** - * Destroyes the socket object, closing it if necessary. - */ - virtual ~TSocketPool(); - - /** - * Add a server to the pool - */ - void addServer(const std::string& host, int port); - - /** - * Add a server to the pool - */ - void addServer(boost::shared_ptr<TSocketPoolServer> &server); - - /** - * Set list of servers in this pool - */ - void setServers(const std::vector< boost::shared_ptr<TSocketPoolServer> >& servers); - - /** - * Get list of servers in this pool - */ - void getServers(std::vector< boost::shared_ptr<TSocketPoolServer> >& servers); - - /** - * Sets how many times to keep retrying a host in the connect function. - */ - void setNumRetries(int numRetries); - - /** - * Sets how long to wait until retrying a host if it was marked down - */ - void setRetryInterval(int retryInterval); - - /** - * Sets how many times to keep retrying a host before marking it as down. - */ - void setMaxConsecutiveFailures(int maxConsecutiveFailures); - - /** - * Turns randomization in connect order on or off. - */ - void setRandomize(bool randomize); - - /** - * Whether to always try the last server. - */ - void setAlwaysTryLast(bool alwaysTryLast); - - /** - * Creates and opens the UNIX socket. - */ - void open(); - - /* - * Closes the UNIX socket - */ - void close(); - - protected: - - void setCurrentServer(const boost::shared_ptr<TSocketPoolServer> &server); - - /** List of servers to connect to */ - std::vector< boost::shared_ptr<TSocketPoolServer> > servers_; - - /** Current server */ - boost::shared_ptr<TSocketPoolServer> currentServer_; - - /** How many times to retry each host in connect */ - int numRetries_; - - /** Retry interval in seconds, how long to not try a host if it has been - * marked as down. - */ - time_t retryInterval_; - - /** Max consecutive failures before marking a host down. */ - int maxConsecutiveFailures_; - - /** Try hosts in order? or Randomized? */ - bool randomize_; - - /** Always try last host, even if marked down? */ - bool alwaysTryLast_; -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TSOCKETPOOL_H_ - http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransport.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransport.h deleted file mode 100644 index 3b552c4..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransport.h +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1 - -#include <thrift/Thrift.h> -#include <boost/shared_ptr.hpp> -#include <thrift/transport/TTransportException.h> -#include <string> - -namespace apache { namespace thrift { namespace transport { - -/** - * Helper template to hoist readAll implementation out of TTransport - */ -template <class Transport_> -uint32_t readAll(Transport_ &trans, uint8_t* buf, uint32_t len) { - uint32_t have = 0; - uint32_t get = 0; - - while (have < len) { - get = trans.read(buf+have, len-have); - if (get <= 0) { - throw TTransportException(TTransportException::END_OF_FILE, - "No more data to read."); - } - have += get; - } - - return have; -} - - -/** - * Generic interface for a method of transporting data. A TTransport may be - * capable of either reading or writing, but not necessarily both. - * - */ -class TTransport { - public: - /** - * Virtual deconstructor. - */ - virtual ~TTransport() {} - - /** - * Whether this transport is open. - */ - virtual bool isOpen() { - return false; - } - - /** - * Tests whether there is more data to read or if the remote side is - * still open. By default this is true whenever the transport is open, - * but implementations should add logic to test for this condition where - * possible (i.e. on a socket). - * This is used by a server to check if it should listen for another - * request. - */ - virtual bool peek() { - return isOpen(); - } - - /** - * Opens the transport for communications. - * - * @return bool Whether the transport was successfully opened - * @throws TTransportException if opening failed - */ - virtual void open() { - throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport."); - } - - /** - * Closes the transport. - */ - virtual void close() { - throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport."); - } - - /** - * Attempt to read up to the specified number of bytes into the string. - * - * @param buf Reference to the location to write the data - * @param len How many bytes to read - * @return How many bytes were actually read - * @throws TTransportException If an error occurs - */ - uint32_t read(uint8_t* buf, uint32_t len) { - T_VIRTUAL_CALL(); - return read_virt(buf, len); - } - virtual uint32_t read_virt(uint8_t* /* buf */, uint32_t /* len */) { - throw TTransportException(TTransportException::NOT_OPEN, - "Base TTransport cannot read."); - } - - /** - * Reads the given amount of data in its entirety no matter what. - * - * @param s Reference to location for read data - * @param len How many bytes to read - * @return How many bytes read, which must be equal to size - * @throws TTransportException If insufficient data was read - */ - uint32_t readAll(uint8_t* buf, uint32_t len) { - T_VIRTUAL_CALL(); - return readAll_virt(buf, len); - } - virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) { - return apache::thrift::transport::readAll(*this, buf, len); - } - - /** - * Called when read is completed. - * This can be over-ridden to perform a transport-specific action - * e.g. logging the request to a file - * - * @return number of bytes read if available, 0 otherwise. - */ - virtual uint32_t readEnd() { - // default behaviour is to do nothing - return 0; - } - - /** - * Writes the string in its entirety to the buffer. - * - * Note: You must call flush() to ensure the data is actually written, - * and available to be read back in the future. Destroying a TTransport - * object does not automatically flush pending data--if you destroy a - * TTransport object with written but unflushed data, that data may be - * discarded. - * - * @param buf The data to write out - * @throws TTransportException if an error occurs - */ - void write(const uint8_t* buf, uint32_t len) { - T_VIRTUAL_CALL(); - write_virt(buf, len); - } - virtual void write_virt(const uint8_t* /* buf */, uint32_t /* len */) { - throw TTransportException(TTransportException::NOT_OPEN, - "Base TTransport cannot write."); - } - - /** - * Called when write is completed. - * This can be over-ridden to perform a transport-specific action - * at the end of a request. - * - * @return number of bytes written if available, 0 otherwise - */ - virtual uint32_t writeEnd() { - // default behaviour is to do nothing - return 0; - } - - /** - * Flushes any pending data to be written. Typically used with buffered - * transport mechanisms. - * - * @throws TTransportException if an error occurs - */ - virtual void flush() { - // default behaviour is to do nothing - } - - /** - * Attempts to return a pointer to \c len bytes, possibly copied into \c buf. - * Does not consume the bytes read (i.e.: a later read will return the same - * data). This method is meant to support protocols that need to read - * variable-length fields. They can attempt to borrow the maximum amount of - * data that they will need, then consume (see next method) what they - * actually use. Some transports will not support this method and others - * will fail occasionally, so protocols must be prepared to use read if - * borrow fails. - * - * @oaram buf A buffer where the data can be stored if needed. - * If borrow doesn't return buf, then the contents of - * buf after the call are undefined. This parameter may be - * NULL to indicate that the caller is not supplying storage, - * but would like a pointer into an internal buffer, if - * available. - * @param len *len should initially contain the number of bytes to borrow. - * If borrow succeeds, *len will contain the number of bytes - * available in the returned pointer. This will be at least - * what was requested, but may be more if borrow returns - * a pointer to an internal buffer, rather than buf. - * If borrow fails, the contents of *len are undefined. - * @return If the borrow succeeds, return a pointer to the borrowed data. - * This might be equal to \c buf, or it might be a pointer into - * the transport's internal buffers. - * @throws TTransportException if an error occurs - */ - const uint8_t* borrow(uint8_t* buf, uint32_t* len) { - T_VIRTUAL_CALL(); - return borrow_virt(buf, len); - } - virtual const uint8_t* borrow_virt(uint8_t* /* buf */, uint32_t* /* len */) { - return NULL; - } - - /** - * Remove len bytes from the transport. This should always follow a borrow - * of at least len bytes, and should always succeed. - * TODO(dreiss): Is there any transport that could borrow but fail to - * consume, or that would require a buffer to dump the consumed data? - * - * @param len How many bytes to consume - * @throws TTransportException If an error occurs - */ - void consume(uint32_t len) { - T_VIRTUAL_CALL(); - consume_virt(len); - } - virtual void consume_virt(uint32_t /* len */) { - throw TTransportException(TTransportException::NOT_OPEN, - "Base TTransport cannot consume."); - } - - protected: - /** - * Simple constructor. - */ - TTransport() {} -}; - -/** - * Generic factory class to make an input and output transport out of a - * source transport. Commonly used inside servers to make input and output - * streams out of raw clients. - * - */ -class TTransportFactory { - public: - TTransportFactory() {} - - virtual ~TTransportFactory() {} - - /** - * Default implementation does nothing, just returns the transport given. - */ - virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) { - return trans; - } - -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransportException.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransportException.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransportException.cpp deleted file mode 100644 index 2c1f303..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TTransportException.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/transport/TTransportException.h> -#include <boost/lexical_cast.hpp> -#include <cstring> - -#include <thrift/thrift-config.h> - -using std::string; -using boost::lexical_cast; - -namespace apache { namespace thrift { namespace transport { - -}}} // apache::thrift::transport
