This is an automated email from the ASF dual-hosted git repository. bbender pushed a commit to branch feature/asio in repository https://gitbox.apache.org/repos/asf/geode-native.git
commit 3c8c9b3e471653448dc1ad99ce842663f8bf4601 Author: Matthew Reddington <[email protected]> AuthorDate: Mon Aug 10 15:31:24 2020 -0700 Shutdown bugs. --- cppcache/src/Connector.hpp | 15 +- cppcache/src/DistributedSystem.hpp | 2 - cppcache/src/TcpConn.cpp | 260 ++++++++++++++++++++++----- cppcache/src/TcpConn.hpp | 4 +- cppcache/src/TcpSslConn.cpp | 6 +- cppcache/src/TcpSslConn.hpp | 4 +- cppcache/src/TcrConnection.cpp | 289 +++++++++++++++---------------- cppcache/src/TcrConnection.hpp | 53 +----- cppcache/src/TcrEndpoint.cpp | 11 +- cppcache/src/ThinClientBaseDM.hpp | 1 + cppcache/src/ThinClientLocatorHelper.cpp | 24 ++- cppcache/src/ThinClientLocatorHelper.hpp | 1 + cppcache/src/ThinClientPoolDM.cpp | 37 +++- 13 files changed, 428 insertions(+), 279 deletions(-) diff --git a/cppcache/src/Connector.hpp b/cppcache/src/Connector.hpp index e308f9d..5bef232 100644 --- a/cppcache/src/Connector.hpp +++ b/cppcache/src/Connector.hpp @@ -71,7 +71,8 @@ class APACHE_GEODE_EXPORT Connector { * @exception GeodeIOException, TimeoutException, IllegalArgumentException, * OutOfMemoryException. */ - virtual size_t receive(char *b, size_t len) = 0; + virtual size_t receive(char *b, size_t len, + std::chrono::milliseconds timeout) = 0; /** * Writes <code>len</code> bytes from the specified byte array @@ -83,7 +84,8 @@ class APACHE_GEODE_EXPORT Connector { * @return the actual number of bytes written. * @exception GeodeIOException, TimeoutException, IllegalArgumentException. */ - virtual size_t send(const char *b, size_t len) = 0; + virtual size_t send(const char *b, size_t len, + std::chrono::milliseconds timeout) = 0; /** * Returns local port for this TCP connection @@ -100,8 +102,9 @@ class APACHE_GEODE_EXPORT Connector { * @exception GeodeIOException, TimeoutException */ template <typename T, size_t size> - size_t send(const T (&array)[size]) { - return send(reinterpret_cast<const char *>(array), sizeof(T) * size); + size_t send(const T (&array)[size], std::chrono::milliseconds timeout) { + return send(reinterpret_cast<const char *>(array), sizeof(T) * size, + timeout); } /** @@ -114,8 +117,8 @@ class APACHE_GEODE_EXPORT Connector { * @exception GeodeIOException, TimeoutException */ template <typename T, size_t size> - size_t receive(T (&array)[size]) { - return receive(reinterpret_cast<char *>(array), sizeof(T) * size); + size_t receive(T (&array)[size], std::chrono::milliseconds timeout) { + return receive(reinterpret_cast<char *>(array), sizeof(T) * size, timeout); } }; } // namespace client diff --git a/cppcache/src/DistributedSystem.hpp b/cppcache/src/DistributedSystem.hpp index a03ea6c..3b080b9 100644 --- a/cppcache/src/DistributedSystem.hpp +++ b/cppcache/src/DistributedSystem.hpp @@ -48,7 +48,6 @@ namespace client { class SystemProperties; class DistributedSystemImpl; class CacheRegionHelper; -class TcrConnection; class APACHE_GEODE_EXPORT DistributedSystem { /** @@ -108,7 +107,6 @@ class APACHE_GEODE_EXPORT DistributedSystem { friend class CacheRegionHelper; friend class DistributedSystemImpl; - friend class TcrConnection; }; } // namespace client } // namespace geode diff --git a/cppcache/src/TcpConn.cpp b/cppcache/src/TcpConn.cpp index 86f9a32..6ae116b 100644 --- a/cppcache/src/TcpConn.cpp +++ b/cppcache/src/TcpConn.cpp @@ -20,6 +20,7 @@ #include <iomanip> #include <iostream> +#include <boost/optional.hpp> #include <boost/system/error_code.hpp> #include <boost/system/system_error.hpp> @@ -109,12 +110,51 @@ TcpConn::TcpConn(const std::string ipaddr, connect_timeout, maxBuffSizePool} {} TcpConn::TcpConn(const std::string host, uint16_t port, - std::chrono::microseconds /*connect_timeout*/, - int32_t maxBuffSizePool) + std::chrono::microseconds timeout, int32_t maxBuffSizePool) : socket_{io_context_} { - // We must connect first so we have a valid file descriptor to set options on. - boost::asio::connect(socket_, boost::asio::ip::tcp::resolver(io_context_) - .resolve(host, std::to_string(port))); + boost::optional<boost::system::error_code> connect_result, timer_result; + boost::asio::deadline_timer connect_deadline{io_context_}; + + try { + // We must connect first so we have a valid file descriptor to set options + // on. + boost::asio::async_connect( + socket_, + boost::asio::ip::tcp::resolver(io_context_) + .resolve(host, std::to_string(port)), + [&connect_result](const boost::system::error_code &ec, + const boost::asio::ip::tcp::endpoint) -> bool { + connect_result.reset(ec); + return true; + }); + + connect_deadline.expires_from_now( + boost::posix_time::milliseconds(timeout.count())); + connect_deadline.async_wait( + [&timer_result](const boost::system::error_code &ec) { + if (ec) { + timer_result.reset(ec); + } + }); + + io_context_.reset(); + while (io_context_.run_one()) { + if (timer_result) { + socket_.cancel(); + } + if (connect_result) { + connect_deadline.cancel(); + } + } + } catch (...) { + std::cout << "Throwing an unexpected connect exception\n"; + throw; + } + + if (connect_result && *connect_result) { + std::cout << "Throwing a connect exception\n"; + throw *connect_result; + } std::stringstream ss; ss << "Connected " << socket_.local_endpoint() << " -> " @@ -161,50 +201,182 @@ TcpConn::TcpConn(const std::string ipaddr, TcpConn::~TcpConn() { std::stringstream ss; - ss << "Disconnected " << socket_.local_endpoint() << " -> " - << socket_.remote_endpoint(); + + try { + ss << "Disconnected " << socket_.local_endpoint() << " -> " + << socket_.remote_endpoint(); + + socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both); + + } catch (...) { + ss = std::stringstream{}; + + ss << "Closed socket " << socket_.local_endpoint(); + } + + socket_.close(); + LOGFINE(ss.str()); } -size_t TcpConn::receive(char *buff, size_t len) { - auto start = std::chrono::system_clock::now(); - - return boost::asio::read(socket_, boost::asio::buffer(buff, len), - [len, start](boost::system::error_code &ec, - const std::size_t n) -> std::size_t { - if (ec && ec != boost::asio::error::eof) { - // Quit if we encounter an error. - // Defer EOF to timeout. - return 0; - } else if (start + std::chrono::milliseconds(25) <= - std::chrono::system_clock::now()) { - // Sometimes we don't know how much data to - // expect, so we're reading into an oversized - // buffer without knowing when to quit other than - // by timeout. Typically, if we timeout, we also - // have an EOF, meaning the connection is likely - // broken and will have to be closed. But if we - // have bytes, we may have just done a - // dumb/blind/hail mary receive, so defer broken - // connection handling until the next IO - // operation. - if (n) { - // This prevents the timeout from being an - // error condition. - ec = boost::system::error_code{}; - } - // But if n == 0 when we timeout, it's just a - // broken connection. - - return 0; - } - - return len - n; - }); +size_t TcpConn::receive(char *buff, const size_t len, + std::chrono::milliseconds timeout) { + std::stringstream ss; + ss << "Receiving " << len << " bytes from " << socket_.remote_endpoint() + << " -> " << socket_.local_endpoint(); + LOGDEBUG(ss.str()); + + boost::optional<boost::system::error_code> timer_result, read_result; + std::size_t bytes_read = 0; + + try { + // Here we prep the Asio subsystem for a read operation with the completion + // condition below. + boost::asio::async_read( + socket_, boost::asio::buffer(buff, len), + [&read_result, &bytes_read, len](const boost::system::error_code &ec, + const size_t n) -> size_t { + bytes_read = n; + + // Aborts come from timeouts or manual interrupts, as seen below in + // the while loop. If we timeout and haven't read anything, the + // connection is probably broken. A broken pipe is indicated by an + // EOF. + if (ec == boost::asio::error::operation_aborted && 0 == n) { + read_result.reset( + boost::system::error_code{boost::asio::error::eof}); + return 0; + } + // If we timeout and there are bytes read, that isn't necessarily an + // error; Asio presumes it's meant to fill a fixed size buffer + // exactly. The buffer may simply be too big for an expected response + // but of an unknown size. + // + // EOF itself occurs when there is no data available on the socket at + // the time of the read. It may simply imply data has yet to arrive. + // Do nothing. Defer to timeout rather than assume a broken + // connection. + // + // For every other error condition, including a timeout with data, + // complete the operation. + else if (ec && ec != boost::asio::error::eof && + ec != boost::asio::error::try_again) { + read_result.reset(ec); + return 0; + } + // Once the buffer is filled, indicate success, regardless the error + // condition on the socket. Defer to the next receive operation to + // handle that eventuality. + else if (n == len) { + read_result.reset(boost::system::error_code{}); + return 0; + } + + // As the last read was successful, continue filling the fixed size + // buffer. + return len - n; + }); + + // This timer will abort the operation after the timeout period, and that + // will be indicated within the completion handler above. + boost::asio::deadline_timer read_deadline{io_context_}; + read_deadline.expires_from_now( + boost::posix_time::milliseconds(timeout.count())); + read_deadline.async_wait( + [&timer_result](const boost::system::error_code &ec) { + if (ec) { + timer_result.reset(ec); + } + }); + + // Run until the context enters the stopped state. + io_context_.reset(); + while (io_context_.run_one()) { + // If something went wrong with the timer, abort the read. + // This will result in an aborted read result. + if (timer_result) { + socket_.cancel(); + } + if (read_result) { + read_deadline.cancel(); + } + } + } catch (...) { + std::cout << "Throwing an unexpected read exception\n"; + throw; + } + + if (read_result && *read_result) { + std::cout << "Throwing a read exception\n"; + throw *read_result; + } + + return bytes_read; } -size_t TcpConn::send(const char *buff, size_t len) { - return boost::asio::write(socket_, boost::asio::buffer(buff, len)); +size_t TcpConn::send(const char *buff, const size_t len, + std::chrono::milliseconds timeout) { + std::stringstream ss; + ss << "Sending " << len << " bytes from " << socket_.local_endpoint() + << " -> " << socket_.remote_endpoint(); + LOGDEBUG(ss.str()); + + boost::optional<boost::system::error_code> timer_result, write_result; + std::size_t bytes_written = 0; + + try { + boost::asio::async_write( + socket_, boost::asio::buffer(buff, len), + [&write_result, &bytes_written, len]( + const boost::system::error_code &ec, const size_t n) -> size_t { + bytes_written = n; + + if (ec == boost::asio::error::operation_aborted && 0 == n) { + write_result.reset( + boost::system::error_code{boost::asio::error::eof}); + return 0; + } else if (ec && ec != boost::asio::error::eof && + ec != boost::asio::error::try_again) { + write_result.reset(ec); + return 0; + } else if (n == len) { + write_result.reset(boost::system::error_code{}); + return 0; + } + + return len - n; + }); + + boost::asio::deadline_timer write_deadline{io_context_}; + write_deadline.expires_from_now( + boost::posix_time::milliseconds(timeout.count())); + write_deadline.async_wait( + [&timer_result](const boost::system::error_code &ec) { + if (ec) { + timer_result.reset(ec); + } + }); + + io_context_.reset(); + while (io_context_.run_one()) { + if (timer_result) { + socket_.cancel(); + } + if (write_result) { + write_deadline.cancel(); + } + } + } catch (...) { + std::cout << "Throwing an unexpected write exception\n"; + throw; + } + + if (write_result && *write_result) { + std::cout << "Throwing a write exception\n"; + throw *write_result; + } + + return bytes_written; } // Return the local port for this TCP connection. diff --git a/cppcache/src/TcpConn.hpp b/cppcache/src/TcpConn.hpp index fef274a..50f6552 100644 --- a/cppcache/src/TcpConn.hpp +++ b/cppcache/src/TcpConn.hpp @@ -30,8 +30,8 @@ namespace apache { namespace geode { namespace client { class APACHE_GEODE_EXPORT TcpConn : public Connector { - size_t receive(char* buff, size_t len) override; - size_t send(const char* buff, size_t len) override; + size_t receive(char*, size_t, std::chrono::milliseconds) override; + size_t send(const char*, size_t, std::chrono::milliseconds) override; uint16_t getPort() override final; diff --git a/cppcache/src/TcpSslConn.cpp b/cppcache/src/TcpSslConn.cpp index 608484b..2e1d035 100644 --- a/cppcache/src/TcpSslConn.cpp +++ b/cppcache/src/TcpSslConn.cpp @@ -91,7 +91,8 @@ TcpSslConn::~TcpSslConn() { LOGFINE(ss.str()); } -size_t TcpSslConn::receive(char *buff, size_t len) { +size_t TcpSslConn::receive(char *buff, const size_t len, + std::chrono::milliseconds) { auto start = std::chrono::system_clock::now(); return boost::asio::read(*socket_stream_, boost::asio::buffer(buff, len), @@ -128,7 +129,8 @@ size_t TcpSslConn::receive(char *buff, size_t len) { }); } -size_t TcpSslConn::send(const char *buff, size_t len) { +size_t TcpSslConn::send(const char *buff, const size_t len, + std::chrono::milliseconds) { return boost::asio::write(*socket_stream_, boost::asio::buffer(buff, len)); } diff --git a/cppcache/src/TcpSslConn.hpp b/cppcache/src/TcpSslConn.hpp index d5ca292..e2ddf0b 100644 --- a/cppcache/src/TcpSslConn.hpp +++ b/cppcache/src/TcpSslConn.hpp @@ -29,8 +29,8 @@ namespace geode { namespace client { class TcpSslConn : public TcpConn { - size_t receive(char* buff, size_t len) override final; - size_t send(const char* buff, size_t len) override final; + size_t receive(char*, size_t, std::chrono::milliseconds) override final; + size_t send(const char*, size_t, std::chrono::milliseconds) override final; using ssl_stream_type = boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>; diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp index 14bda1a..7d098ba 100644 --- a/cppcache/src/TcrConnection.cpp +++ b/cppcache/src/TcrConnection.cpp @@ -34,34 +34,85 @@ #include "Utils.hpp" #include "Version.hpp" -namespace apache { -namespace geode { -namespace client { - -const int HEADER_LENGTH = 17; -const int CHUNK_HEADER_LENGTH = 5; -const int8_t LAST_CHUNK_MASK = 0x1; -const int64_t INITIAL_CONNECTION_ID = 26739; - #define throwException(ex) \ do { \ LOGFINEST(ex.getName() + ": " + ex.getMessage()); \ throw ex; \ } while (0) +namespace { +bool useReplyTimeout(const apache::geode::client::TcrMessage& request) { + switch (request.getMessageType()) { + case apache::geode::client::TcrMessage::QUERY: + case apache::geode::client::TcrMessage::QUERY_WITH_PARAMETERS: + case apache::geode::client::TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE: + case apache::geode::client::TcrMessage::GETDURABLECQS_MSG_TYPE: + case apache::geode::client::TcrMessage::EXECUTE_FUNCTION: + case apache::geode::client::TcrMessage::EXECUTE_REGION_FUNCTION: + case apache::geode::client::TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP: + return true; + default: + break; + } + + return false; +} + +int expiryTimeVariancePercentage() { + auto nowTimePoint = std::chrono::steady_clock::now().time_since_epoch(); + auto now_ms = + std::chrono::duration_cast<std::chrono::milliseconds>(nowTimePoint) + .count(); + auto now_s = + std::chrono::duration_cast<std::chrono::seconds>(nowTimePoint).count(); + + srand(static_cast<unsigned int>((now_s * 1000) + (now_ms / 1000))); + + const int numbers = 21; + int random = rand() % numbers + 1; + + if (random > 10) { + random = random - numbers; + } + return random; +} + +const int HEADER_LENGTH = 17; +const int CHUNK_HEADER_LENGTH = 5; +const int8_t LAST_CHUNK_MASK = 0x1; +const int64_t INITIAL_CONNECTION_ID = 26739; + struct FinalizeProcessChunk { private: - TcrMessage& m_reply; + apache::geode::client::TcrMessage& m_reply; uint16_t m_endpointMemId; public: - FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointMemId) + FinalizeProcessChunk(apache::geode::client::TcrMessageReply& reply, + uint16_t endpointMemId) : m_reply(reply), m_endpointMemId(endpointMemId) {} ~FinalizeProcessChunk() noexcept(false) { // Enqueue a nullptr chunk indicating a wait for processing to complete. m_reply.processChunk(std::vector<uint8_t>(), 0, m_endpointMemId); } }; +} // namespace + +namespace apache { +namespace geode { +namespace client { + +TcrConnection::TcrConnection(const TcrConnectionManager& connectionManager) + : connectionId(0), + m_connectionManager(connectionManager), + m_expiryTimeVariancePercentage{expiryTimeVariancePercentage()}, + m_hasServerQueue(NON_REDUNDANT_SERVER), + m_queueSize(0), + m_port(0), + m_chunksProcessSema(0), + m_isBeingUsed(false), + m_isUsed(0), + m_poolDM(nullptr) {} bool TcrConnection::initTcrConnection( std::shared_ptr<TcrEndpoint> endpointObj, @@ -72,9 +123,8 @@ bool TcrConnection::initTcrConnection( m_poolDM = dynamic_cast<ThinClientPoolDM*>(m_endpointObj->getPoolHADM()); m_hasServerQueue = NON_REDUNDANT_SERVER; m_queueSize = 0; - m_creationTime = clock::now(); + m_lastAccessed = m_creationTime = std::chrono::steady_clock::now(); connectionId = INITIAL_CONNECTION_ID; - m_lastAccessed = clock::now(); auto cacheImpl = m_poolDM->getConnectionManager().getCacheImpl(); const auto& distributedSystem = cacheImpl->getDistributedSystem(); const auto& sysProp = distributedSystem.getSystemProperties(); @@ -246,7 +296,7 @@ bool TcrConnection::initTcrConnection( endpointObj->name().c_str(), isClientNotification ? (isSecondary ? "secondary " : "primary ") : "", isClientNotification ? "subscription" : "client"); - ConnErrType error = sendData(data, msgLength, connectTimeout, false); + ConnErrType error = sendData(data, msgLength, connectTimeout); if (error == CONN_NOERR) { auto acceptanceCode = readHandshakeData(1, connectTimeout); @@ -290,12 +340,10 @@ bool TcrConnection::initTcrConnection( /////////////////////////////////// ////////////////////////// 3. Only when handshake is for subscription /////////////////////////////////// - if (m_poolDM != nullptr) { - if ((m_hasServerQueue == PRIMARY_SERVER || - m_hasServerQueue == NON_REDUNDANT_SERVER) && - isClientNotification) { - m_poolDM->setPrimaryServerQueueSize(queueSize); - } + if ((m_hasServerQueue == PRIMARY_SERVER || + m_hasServerQueue == NON_REDUNDANT_SERVER) && + isClientNotification) { + m_poolDM->setPrimaryServerQueueSize(queueSize); } if (!isClientNotification) { @@ -317,7 +365,7 @@ bool TcrConnection::initTcrConnection( } auto recvMsgLenBytes = readHandshakeData(2, connectTimeout); - auto dataInput3 = m_connectionManager->getCacheImpl()->createDataInput( + auto dataInput3 = m_connectionManager.getCacheImpl()->createDataInput( reinterpret_cast<const uint8_t*>(recvMsgLenBytes.data()), recvMsgLenBytes.size()); uint16_t recvMsgLen2 = dataInput3.readInt16(); @@ -325,7 +373,7 @@ bool TcrConnection::initTcrConnection( if (!isClientNotification) { auto deltaEnabledMsg = readHandshakeData(1, connectTimeout); - auto di = m_connectionManager->getCacheImpl()->createDataInput( + auto di = m_connectionManager.getCacheImpl()->createDataInput( reinterpret_cast<const uint8_t*>(deltaEnabledMsg.data()), 1); ThinClientBaseDM::setDeltaEnabledOnServer(di.readBoolean()); } @@ -415,7 +463,7 @@ bool TcrConnection::initTcrConnection( void TcrConnection::createConnection(const char* endpoint, std::chrono::microseconds connectTimeout, int32_t maxBuffSizePool) { - auto& systemProperties = m_connectionManager->getCacheImpl() + auto& systemProperties = m_connectionManager.getCacheImpl() ->getDistributedSystem() .getSystemProperties(); @@ -429,94 +477,47 @@ void TcrConnection::createConnection(const char* endpoint, } } -/* The timeout behaviour for different methods is as follows: - * receive(): - * Header: default timeout - * Body: default timeout - * sendRequest()/sendRequestForChunkedResponse(): - * default timeout during send; for receive: - * Header: default timeout * default timeout retries to handle large payload - * if a timeout other than default timeout is specified then - * that is used instead - * Body: default timeout - */ ConnErrType TcrConnection::receiveData( - char* buffer, size_t length, std::chrono::microseconds operation_window, - bool /*checkConnected*/, bool isNotificationMessage) { - auto startLen = length; - - while (length > 0 && operation_window > decltype(operation_window)::zero()) { - const auto start = std::chrono::system_clock::now(); - - std::size_t readBytes = 0; - - try { - readBytes = m_conn->receive(buffer, length); - } catch (boost::system::system_error& ex) { - switch (ex.code().value()) { - case boost::asio::error::eof: - LOGDEBUG("Connection:receive: \"%s\"", ex.what()); - case boost::asio::error::try_again: - break; - default: { - LOGERROR("Connection:receive: \"%s\"", ex.what()); - return CONN_IOERR; - } - } - } - - length -= readBytes; - buffer += readBytes; - - if (m_poolDM != nullptr) { - m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes)); - } - - if ((length == startLen) && isNotificationMessage) { // no data read - break; + char* buffer, const size_t length, + const std::chrono::microseconds timeout) { + try { + const auto readBytes = m_conn->receive( + buffer, length, + std::chrono::duration_cast<std::chrono::milliseconds>(timeout)); + + m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes)); + } catch (boost::system::system_error& ex) { + switch (ex.code().value()) { + case boost::asio::error::eof: + return CONN_NODATA; + case boost::asio::error::operation_aborted: + return CONN_TIMEOUT; + default: + break; } - - operation_window -= std::chrono::duration_cast<decltype(operation_window)>( - std::chrono::system_clock::now() - start); - } - // Postconditions for checking bounds. - if (length == 0) { - return CONN_NOERR; - } else if (length == startLen) { - return CONN_NODATA; + return CONN_IOERR; } - return CONN_TIMEOUT; + return CONN_NOERR; } ConnErrType TcrConnection::sendData(const char* buffer, size_t length, - std::chrono::microseconds operation_window, - bool /*checkConnected*/) { - while (length > 0 && operation_window > std::chrono::microseconds::zero()) { - const auto start = std::chrono::system_clock::now(); - - std::size_t sentBytes = 0; - try { - sentBytes = m_conn->send(buffer, length); - } catch (boost::system::system_error& ex) { - switch (ex.code().value()) { - case boost::asio::error::try_again: - break; - default: { - LOGERROR("Connection:send: \"%s\"", ex.what()); - return CONN_IOERR; - } - } + std::chrono::microseconds timeout) { + try { + m_conn->send( + buffer, length, + std::chrono::duration_cast<std::chrono::milliseconds>(timeout)); + } catch (boost::system::system_error& ex) { + switch (ex.code().value()) { + case boost::asio::error::operation_aborted: + return CONN_TIMEOUT; + default: + break; } - - length -= sentBytes; - buffer += sentBytes; - - operation_window -= std::chrono::duration_cast<decltype(operation_window)>( - std::chrono::system_clock::now() - start); + return CONN_IOERR; } - return length == 0 ? CONN_NOERR : CONN_TIMEOUT; + return CONN_NOERR; } char* TcrConnection::sendRequest(const char* buffer, size_t len, @@ -559,17 +560,6 @@ void TcrConnection::sendRequestForChunkedResponse( } } -bool TcrConnection::useReplyTimeout(const TcrMessage& request) const { - auto messageType = request.getMessageType(); - return ((messageType == TcrMessage::QUERY) || - (messageType == TcrMessage::QUERY_WITH_PARAMETERS) || - (messageType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) || - (messageType == TcrMessage::GETDURABLECQS_MSG_TYPE) || - (messageType == TcrMessage::EXECUTE_FUNCTION) || - (messageType == TcrMessage::EXECUTE_REGION_FUNCTION) || - (messageType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP)); -} - std::chrono::microseconds TcrConnection::sendWithTimeouts( const char* data, size_t len, std::chrono::microseconds sendTimeout, std::chrono::microseconds receiveTimeout) { @@ -608,29 +598,20 @@ bool TcrConnection::replyHasResult(const TcrMessage& request, void TcrConnection::send(const char* buffer, size_t len, std::chrono::microseconds sendTimeoutSec, bool) { - // LOGINFO("TcrConnection::send: [%p] sending request to endpoint %s;", - //: this, m_endpointObj->name().c_str()); - LOGDEBUG( "TcrConnection::send: [%p] sending request to endpoint %s; bytes: %s", this, m_endpointObj->name().c_str(), Utils::convertBytesToString(buffer, len).c_str()); - ConnErrType error = sendData(buffer, len, sendTimeoutSec); - - LOGFINER( - "TcrConnection::send: completed send request to endpoint %s " - "with error: %d", - m_endpointObj->name().c_str(), error); - - if (error != CONN_NOERR) { - if (error == CONN_TIMEOUT) { + switch (sendData(buffer, len, sendTimeoutSec)) { + case CONN_NOERR: + break; + case CONN_TIMEOUT: throwException( TimeoutException("TcrConnection::send: connection timed out")); - } else { + default: throwException( GeodeIOException("TcrConnection::send: connection failure")); - } } } @@ -653,8 +634,7 @@ char* TcrConnection::readMessage(size_t* recvLen, headerTimeout = DEFAULT_READ_TIMEOUT * DEFAULT_TIMEOUT_RETRIES; } - error = receiveData(msg_header, HEADER_LENGTH, headerTimeout, true, - isNotificationMessage); + error = receiveData(msg_header, HEADER_LENGTH, headerTimeout); if (error != CONN_NOERR) { // the !isNotificationMessage ensures that notification channel @@ -689,7 +669,7 @@ char* TcrConnection::readMessage(size_t* recvLen, this, m_endpointObj->name().c_str(), Utils::convertBytesToString(msg_header, HEADER_LENGTH).c_str()); - auto input = m_connectionManager->getCacheImpl()->createDataInput( + auto input = m_connectionManager.getCacheImpl()->createDataInput( reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH); // ignore msgType input.readInt32(); @@ -714,8 +694,7 @@ char* TcrConnection::readMessage(size_t* recvLen, if (isNotificationMessage) { mesgBodyTimeout = receiveTimeoutSec * DEFAULT_TIMEOUT_RETRIES; } - error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout, - true, isNotificationMessage); + error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout); if (error != CONN_NOERR) { delete[] fullMessage; // the !isNotificationMessage ensures that notification channel @@ -808,7 +787,7 @@ chunkedResponseHeader TcrConnection::readResponseHeader( chunkedResponseHeader header; auto error = receiveData(reinterpret_cast<char*>(receiveBuffer), - HEADER_LENGTH, timeout, true, false); + HEADER_LENGTH, timeout); if (error != CONN_NOERR) { if (error & CONN_TIMEOUT) { throwException(TimeoutException( @@ -827,7 +806,7 @@ chunkedResponseHeader TcrConnection::readResponseHeader( m_endpointObj->name().c_str(), Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str()); - auto input = m_connectionManager->getCacheImpl()->createDataInput( + auto input = m_connectionManager.getCacheImpl()->createDataInput( receiveBuffer, HEADER_LENGTH); header.messageType = input.readInt32(); header.numberOfParts = input.readInt32(); @@ -850,7 +829,7 @@ chunkHeader TcrConnection::readChunkHeader(std::chrono::microseconds timeout) { chunkHeader header; auto error = receiveData(reinterpret_cast<char*>(receiveBuffer), - CHUNK_HEADER_LENGTH, timeout, true, false); + CHUNK_HEADER_LENGTH, timeout); if (error != CONN_NOERR) { if (error & CONN_TIMEOUT) { throwException(TimeoutException( @@ -869,7 +848,7 @@ chunkHeader TcrConnection::readChunkHeader(std::chrono::microseconds timeout) { m_endpointObj->name().c_str(), Utils::convertBytesToString(receiveBuffer, CHUNK_HEADER_LENGTH).c_str()); - auto input = m_connectionManager->getCacheImpl()->createDataInput( + auto input = m_connectionManager.getCacheImpl()->createDataInput( receiveBuffer, CHUNK_HEADER_LENGTH); header.chunkLength = input.readInt32(); header.flags = input.read(); @@ -885,7 +864,7 @@ std::vector<uint8_t> TcrConnection::readChunkBody( std::chrono::microseconds timeout, int32_t chunkLength) { std::vector<uint8_t> chunkBody(chunkLength); auto error = receiveData(reinterpret_cast<char*>(chunkBody.data()), - chunkLength, timeout, true, false); + chunkLength, timeout); if (error != CONN_NOERR) { if (error & CONN_TIMEOUT) { throwException( @@ -930,7 +909,7 @@ void TcrConnection::close() { m_poolDM->getConnectionManager().getCacheImpl()); try { if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH && - !m_connectionManager->isNetDown()) { + !m_connectionManager.isNetDown()) { send(closeMsg->getMsgData(), closeMsg->getMsgLength(), std::chrono::seconds(2), false); } @@ -953,7 +932,7 @@ std::vector<int8_t> TcrConnection::readHandshakeData( return message; } if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength, - connectTimeout, false)) != CONN_NOERR) { + connectTimeout)) != CONN_NOERR) { m_conn.reset(); if (error & CONN_TIMEOUT) { throwException( @@ -980,7 +959,7 @@ std::shared_ptr<CacheableBytes> TcrConnection::readHandshakeRawData( } std::vector<int8_t> message(msgLength); if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength, - connectTimeout, false)) != CONN_NOERR) { + connectTimeout)) != CONN_NOERR) { m_conn.reset(); if (error & CONN_TIMEOUT) { throwException( @@ -1010,13 +989,13 @@ int32_t TcrConnection::readHandshakeArraySize( int32_t arrayLength = static_cast<uint8_t>(arrayLenHeader[0]); if (static_cast<int8_t>(arrayLenHeader[0]) == -2) { auto arrayLengthBytes = readHandshakeData(2, connectTimeout); - auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput( + auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput( reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()), arrayLengthBytes.size()); arrayLength = dataInput2.readInt16(); } else if (static_cast<int8_t>(arrayLenHeader[0]) == -3) { auto arrayLengthBytes = readHandshakeData(4, connectTimeout); - auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput( + auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput( reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()), arrayLengthBytes.size()); arrayLength = dataInput2.readInt32(); @@ -1061,7 +1040,7 @@ void TcrConnection::readHandShakeBytes( _GEODE_NEW(recvMessage, uint8_t[numberOfBytes]); if ((error = receiveData(reinterpret_cast<char*>(recvMessage), numberOfBytes, - connectTimeout, false)) != CONN_NOERR) { + connectTimeout)) != CONN_NOERR) { if (error & CONN_TIMEOUT) { _GEODE_SAFE_DELETE_ARRAY(recvMessage); m_conn.reset(); @@ -1087,7 +1066,7 @@ int32_t TcrConnection::readHandShakeInt( _GEODE_NEW(recvMessage, uint8_t[4]); if ((error = receiveData(reinterpret_cast<char*>(recvMessage), 4, - connectTimeout, false)) != CONN_NOERR) { + connectTimeout)) != CONN_NOERR) { if (error & CONN_TIMEOUT) { _GEODE_SAFE_DELETE_ARRAY(recvMessage); m_conn.reset(); @@ -1103,8 +1082,7 @@ int32_t TcrConnection::readHandShakeInt( } } - auto di = - m_connectionManager->getCacheImpl()->createDataInput(recvMessage, 4); + auto di = m_connectionManager.getCacheImpl()->createDataInput(recvMessage, 4); int32_t val = di.readInt32(); _GEODE_SAFE_DELETE_ARRAY(recvMessage); @@ -1117,7 +1095,7 @@ std::shared_ptr<CacheableString> TcrConnection::readHandshakeString( ConnErrType error = CONN_NOERR; char cstypeid; - if (receiveData(&cstypeid, 1, connectTimeout, false) != CONN_NOERR) { + if (receiveData(&cstypeid, 1, connectTimeout) != CONN_NOERR) { m_conn.reset(); if (error & CONN_TIMEOUT) { LOGFINE("Timeout receiving string typeid"); @@ -1141,7 +1119,7 @@ std::shared_ptr<CacheableString> TcrConnection::readHandshakeString( } case DSCode::CacheableASCIIString: { auto lenBytes = readHandshakeData(2, connectTimeout); - auto lenDI = m_connectionManager->getCacheImpl()->createDataInput( + auto lenDI = m_connectionManager.getCacheImpl()->createDataInput( reinterpret_cast<const uint8_t*>(lenBytes.data()), lenBytes.size()); length = lenDI.readInt16(); @@ -1164,8 +1142,8 @@ std::shared_ptr<CacheableString> TcrConnection::readHandshakeString( std::vector<char> recvMessage(length + 1); recvMessage[length] = '\0'; - if ((error = receiveData(recvMessage.data(), length, connectTimeout, - false)) != CONN_NOERR) { + if ((error = receiveData(recvMessage.data(), length, connectTimeout)) != + CONN_NOERR) { if (error & CONN_TIMEOUT) { m_conn.reset(); LOGFINE("Timeout receiving string data"); @@ -1192,7 +1170,8 @@ bool TcrConnection::hasExpired(const std::chrono::milliseconds& expiryTime) { } auto variadicExpiryTime = expiryTime + (expiryTime * m_expiryTimeVariancePercentage) / 100; - return (clock::now() - m_creationTime) > variadicExpiryTime; + return (std::chrono::steady_clock::now() - m_creationTime) > + variadicExpiryTime; } bool TcrConnection::isIdle(const std::chrono::milliseconds& idleTime) { @@ -1200,12 +1179,14 @@ bool TcrConnection::isIdle(const std::chrono::milliseconds& idleTime) { return false; } - return (clock::now() - m_lastAccessed) > idleTime; + return (std::chrono::steady_clock::now() - m_lastAccessed) > idleTime; } -void TcrConnection::touch() { m_lastAccessed = clock::now(); } +void TcrConnection::touch() { + m_lastAccessed = std::chrono::steady_clock::now(); +} -TcrConnection::time_point TcrConnection::getLastAccessed() { +std::chrono::steady_clock::time_point TcrConnection::getLastAccessed() { return m_lastAccessed; } @@ -1223,7 +1204,7 @@ uint8_t TcrConnection::getOverrides(const SystemProperties* props) { } void TcrConnection::updateCreationTime() { - m_creationTime = clock::now(); + m_creationTime = std::chrono::steady_clock::now(); touch(); } diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp index d3c1a7f..8ed9398 100644 --- a/cppcache/src/TcrConnection.hpp +++ b/cppcache/src/TcrConnection.hpp @@ -85,9 +85,6 @@ class ThinClientPoolDM; class TcrConnectionManager; class TcrConnection { public: - using clock = std::chrono::steady_clock; - using time_point = clock::time_point; - /** Create one connection, endpoint is in format of hostname:portno * It will do handshake with j-server. There're 2 types of handshakes: * 1) handshake for request @@ -118,36 +115,7 @@ class TcrConnection { bool isClientNotification = false, bool isSecondary = false, std::chrono::microseconds connectTimeout = DEFAULT_CONNECT_TIMEOUT); - TcrConnection(const TcrConnectionManager& connectionManager, - volatile const bool& isConnected) - : connectionId(0), - m_connectionManager(&connectionManager), - m_connected(isConnected), - m_conn(nullptr), - m_hasServerQueue(NON_REDUNDANT_SERVER), - m_queueSize(0), - m_port(0), - m_chunksProcessSema(0), - m_isBeingUsed(false), - m_isUsed(0), - m_poolDM(nullptr) { - auto nowTimePoint = clock::now().time_since_epoch(); - auto now_ms = - std::chrono::duration_cast<std::chrono::milliseconds>(nowTimePoint) - .count(); - auto now_s = - std::chrono::duration_cast<std::chrono::seconds>(nowTimePoint).count(); - auto seed = (now_s * 1000) + (now_ms / 1000); - srand(static_cast<unsigned int>(seed)); - int numbers = 21; - int random = rand() % numbers + 1; - if (random > 10) { - random = random - numbers; - } - m_expiryTimeVariancePercentage = random; - LOGDEBUG("m_expiryTimeVariancePercentage set to: %d", - m_expiryTimeVariancePercentage); - } + TcrConnection(const TcrConnectionManager& connectionManager); /* destroy the connection */ ~TcrConnection(); @@ -287,7 +255,7 @@ class TcrConnection { void touch(); bool hasExpired(const std::chrono::milliseconds& expiryTime); bool isIdle(const std::chrono::milliseconds& idleTime); - time_point getLastAccessed(); + std::chrono::steady_clock::time_point getLastAccessed(); void updateCreationTime(); int64_t getConnectionId() { @@ -301,12 +269,12 @@ class TcrConnection { } const TcrConnectionManager& getConnectionManager() { - return *m_connectionManager; + return m_connectionManager; } private: int64_t connectionId; - const TcrConnectionManager* m_connectionManager; + const TcrConnectionManager& m_connectionManager; int m_expiryTimeVariancePercentage = 0; std::chrono::microseconds calculateHeaderTimeout( @@ -386,19 +354,15 @@ class TcrConnection { * Send data to the connection till sendTimeout */ ConnErrType sendData(const char* buffer, size_t length, - std::chrono::microseconds sendTimeout, - bool checkConnected = true); + std::chrono::microseconds sendTimeout); /** * Read data from the connection till receiveTimeoutSec */ ConnErrType receiveData(char* buffer, size_t length, - std::chrono::microseconds receiveTimeoutSec, - bool checkConnected = true, - bool isNotificationMessage = false); + std::chrono::microseconds receiveTimeoutSec); std::shared_ptr<TcrEndpoint> m_endpointObj; - volatile const bool& m_connected; std::unique_ptr<Connector> m_conn; ServerQueueStatus m_hasServerQueue; int32_t m_queueSize; @@ -407,8 +371,8 @@ class TcrConnection { // semaphore to synchronize with the chunked response processing thread ACE_Semaphore m_chunksProcessSema; - time_point m_creationTime; - time_point m_lastAccessed; + std::chrono::steady_clock::time_point m_creationTime; + std::chrono::steady_clock::time_point m_lastAccessed; // Disallow copy constructor and assignment operator. TcrConnection(const TcrConnection&); @@ -416,7 +380,6 @@ class TcrConnection { volatile bool m_isBeingUsed; std::atomic<uint32_t> m_isUsed; ThinClientPoolDM* m_poolDM; - bool useReplyTimeout(const TcrMessage& request) const; std::chrono::microseconds sendWithTimeouts( const char* data, size_t len, std::chrono::microseconds sendTimeout, std::chrono::microseconds receiveTimeout); diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp index a731da0..4be0d08 100644 --- a/cppcache/src/TcrEndpoint.cpp +++ b/cppcache/src/TcrEndpoint.cpp @@ -143,8 +143,7 @@ GfErrType TcrEndpoint::createNewConnectionWL( if (locked) { try { LOGFINE("TcrEndpoint::createNewConnectionWL got lock"); - newConn = - new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected); + newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager()); newConn->initTcrConnection(shared_from_this(), m_ports, isClientNotification, isSecondary, connectTimeout); @@ -192,8 +191,7 @@ GfErrType TcrEndpoint::createNewConnection( try { if (newConn == nullptr) { if (!needtoTakeConnectLock() || !appThreadRequest) { - newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(), - m_connected); + newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager()); bool authenticate = newConn->initTcrConnection( shared_from_this(), m_ports, isClientNotification, isSecondary, connectTimeout); @@ -1170,7 +1168,10 @@ void TcrEndpoint::triggerRedundancyThread() { void TcrEndpoint::closeConnection(TcrConnection*& conn) { conn->close(); m_ports.erase(conn->getPort()); - _GEODE_SAFE_DELETE(conn); + try { + _GEODE_SAFE_DELETE(conn); + } catch (...) { + } } void TcrEndpoint::closeConnections() { diff --git a/cppcache/src/ThinClientBaseDM.hpp b/cppcache/src/ThinClientBaseDM.hpp index 2920876..61c3fe3 100644 --- a/cppcache/src/ThinClientBaseDM.hpp +++ b/cppcache/src/ThinClientBaseDM.hpp @@ -37,6 +37,7 @@ namespace client { class TcrMessage; class ThinClientRegion; class TcrEndpoint; +class TcrConnection; class TcrConnectionManager; class TcrMessageReply; class TcrChunkedContext; diff --git a/cppcache/src/ThinClientLocatorHelper.cpp b/cppcache/src/ThinClientLocatorHelper.cpp index cb26224..611ff98 100644 --- a/cppcache/src/ThinClientLocatorHelper.cpp +++ b/cppcache/src/ThinClientLocatorHelper.cpp @@ -95,11 +95,12 @@ GfErrType ThinClientLocatorHelper::getAllServers( if (!conn->send( reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())), - data.getBufferLength())) { + data.getBufferLength(), m_poolDM->getReadTimeout())) { continue; } char buff[BUFF_SIZE]; - const auto receivedLength = conn->receive(buff); + const auto receivedLength = + conn->receive(buff, m_poolDM->getReadTimeout()); if (!receivedLength) { continue; @@ -126,6 +127,8 @@ GfErrType ThinClientLocatorHelper::getAllServers( LOGFINE("Exception while querying locator: %s: %s", excp.getName().c_str(), excp.what()); continue; + } catch (...) { + continue; } } return GF_NOERR; @@ -175,11 +178,12 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn( if (!conn->send( reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())), - data.getBufferLength())) { + data.getBufferLength(), m_poolDM->getReadTimeout())) { continue; } char buff[BUFF_SIZE]; - const auto receivedLength = conn->receive(buff); + const auto receivedLength = + conn->receive(buff, m_poolDM->getReadTimeout()); if (!receivedLength) { continue; @@ -205,6 +209,8 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn( LOGFINE("Exception while querying locator: %s: %s", excp.getName().c_str(), excp.what()); continue; + } catch (...) { + continue; } } throw NoAvailableLocatorsException("Unable to query any locators"); @@ -263,11 +269,12 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn( } if (!conn->send( reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())), - data.getBufferLength())) { + data.getBufferLength(), m_poolDM->getReadTimeout())) { continue; } char buff[BUFF_SIZE]; - const auto receivedLength = conn->receive(buff); + const auto receivedLength = + conn->receive(buff, m_poolDM->getReadTimeout()); if (!receivedLength) { continue; // return GF_EUNDEF; @@ -339,12 +346,13 @@ GfErrType ThinClientLocatorHelper::updateLocators( data.writeObject(request); if (!conn->send( reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())), - data.getBufferLength())) { + data.getBufferLength(), m_poolDM->getReadTimeout())) { conn = nullptr; continue; } char buff[BUFF_SIZE]; - const auto receivedLength = conn->receive(buff); + const auto receivedLength = + conn->receive(buff, m_poolDM->getReadTimeout()); if (!receivedLength) { continue; diff --git a/cppcache/src/ThinClientLocatorHelper.hpp b/cppcache/src/ThinClientLocatorHelper.hpp index 1471592..0034b86 100644 --- a/cppcache/src/ThinClientLocatorHelper.hpp +++ b/cppcache/src/ThinClientLocatorHelper.hpp @@ -39,6 +39,7 @@ namespace client { class ThinClientPoolDM; class Connector; +class TcrConnection; class ThinClientLocatorHelper { public: diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp index 5602828..c4909ec 100644 --- a/cppcache/src/ThinClientPoolDM.cpp +++ b/cppcache/src/ThinClientPoolDM.cpp @@ -404,6 +404,7 @@ void ThinClientPoolDM::manageConnections(std::atomic<bool>& isRunning) { } } } + LOGFINE("ThinClientPoolDM: ending manageConnections thread"); } @@ -432,8 +433,9 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) { removelist.push_back(conn); } else if (conn) { auto nextIdle = - _idle - std::chrono::duration_cast<std::chrono::milliseconds>( - TcrConnection::clock::now() - conn->getLastAccessed()); + _idle - + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now() - conn->getLastAccessed()); if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) { _nextIdle = nextIdle; } @@ -452,7 +454,10 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) { iter != removelist.end(); ++iter) { TcrConnection* conn = *iter; if (replaceCount <= 0) { - GF_SAFE_DELETE_CON(conn); + try { + GF_SAFE_DELETE_CON(conn); + } catch (...) { + } removeEPConnections(1, false); getStats().incLoadCondDisconnects(); LOGDEBUG("Removed a connection"); @@ -463,21 +468,28 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) { /*hasExpired(conn) ? nullptr :*/ conn); if (newConn) { auto nextIdle = - _idle - std::chrono::duration_cast<std::chrono::milliseconds>( - TcrConnection::clock::now() - conn->getLastAccessed()); + _idle - + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now() - conn->getLastAccessed()); if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) { _nextIdle = nextIdle; } put(newConn, false); if (newConn != conn) { - GF_SAFE_DELETE_CON(conn); + try { + GF_SAFE_DELETE_CON(conn); + } catch (...) { + } removeEPConnections(1, false); getStats().incLoadCondDisconnects(); LOGDEBUG("Removed a connection"); } } else { if (hasExpired(conn)) { - GF_SAFE_DELETE_CON(conn); + try { + GF_SAFE_DELETE_CON(conn); + } catch (...) { + } removeEPConnections(1, false); getStats().incLoadCondDisconnects(); LOGDEBUG("Removed a connection"); @@ -486,7 +498,7 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) { auto nextIdle = _idle - std::chrono::duration_cast<std::chrono::milliseconds>( - TcrConnection::clock::now() - conn->getLastAccessed()); + std::chrono::steady_clock::now() - conn->getLastAccessed()); if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) { _nextIdle = nextIdle; } @@ -1444,7 +1456,10 @@ GfErrType ThinClientPoolDM::sendSyncRequest( request.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP); if (conn) { - GF_SAFE_DELETE_CON(conn); + try { + GF_SAFE_DELETE_CON(conn); + } catch (...) { + } } excludeServers.insert(ServerLocation(ep->name())); if (error == GF_IOERR) { @@ -2037,17 +2052,20 @@ void ThinClientPoolDM::pingServerLocal() { void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) { LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str()); + while (isRunning) { m_updateLocatorListSema.acquire(); if (isRunning && !m_connManager.isNetDown()) { (m_locHelper)->updateLocators(getServerGroup()); } } + LOGFINE("Ending updateLocatorList thread for pool %s", m_poolName.c_str()); } void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) { LOGFINE("Starting ping thread for pool %s", m_poolName.c_str()); + while (isRunning) { m_pingSema.acquire(); if (isRunning && !m_connManager.isNetDown()) { @@ -2055,6 +2073,7 @@ void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) { m_pingSema.acquire(); } } + LOGFINE("Ending ping thread for pool %s", m_poolName.c_str()); }
