This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch feature/asio
in repository https://gitbox.apache.org/repos/asf/geode-native.git

commit 3c8c9b3e471653448dc1ad99ce842663f8bf4601
Author: Matthew Reddington <[email protected]>
AuthorDate: Mon Aug 10 15:31:24 2020 -0700

    Shutdown bugs.
---
 cppcache/src/Connector.hpp               |  15 +-
 cppcache/src/DistributedSystem.hpp       |   2 -
 cppcache/src/TcpConn.cpp                 | 260 ++++++++++++++++++++++-----
 cppcache/src/TcpConn.hpp                 |   4 +-
 cppcache/src/TcpSslConn.cpp              |   6 +-
 cppcache/src/TcpSslConn.hpp              |   4 +-
 cppcache/src/TcrConnection.cpp           | 289 +++++++++++++++----------------
 cppcache/src/TcrConnection.hpp           |  53 +-----
 cppcache/src/TcrEndpoint.cpp             |  11 +-
 cppcache/src/ThinClientBaseDM.hpp        |   1 +
 cppcache/src/ThinClientLocatorHelper.cpp |  24 ++-
 cppcache/src/ThinClientLocatorHelper.hpp |   1 +
 cppcache/src/ThinClientPoolDM.cpp        |  37 +++-
 13 files changed, 428 insertions(+), 279 deletions(-)

diff --git a/cppcache/src/Connector.hpp b/cppcache/src/Connector.hpp
index e308f9d..5bef232 100644
--- a/cppcache/src/Connector.hpp
+++ b/cppcache/src/Connector.hpp
@@ -71,7 +71,8 @@ class APACHE_GEODE_EXPORT Connector {
    * @exception  GeodeIOException, TimeoutException, IllegalArgumentException,
    * OutOfMemoryException.
    */
-  virtual size_t receive(char *b, size_t len) = 0;
+  virtual size_t receive(char *b, size_t len,
+                         std::chrono::milliseconds timeout) = 0;
 
   /**
    * Writes <code>len</code> bytes from the specified byte array
@@ -83,7 +84,8 @@ class APACHE_GEODE_EXPORT Connector {
    * @return     the actual number of bytes written.
    * @exception  GeodeIOException, TimeoutException, IllegalArgumentException.
    */
-  virtual size_t send(const char *b, size_t len) = 0;
+  virtual size_t send(const char *b, size_t len,
+                      std::chrono::milliseconds timeout) = 0;
 
   /**
    * Returns local port for this TCP connection
@@ -100,8 +102,9 @@ class APACHE_GEODE_EXPORT Connector {
    * @exception GeodeIOException, TimeoutException
    */
   template <typename T, size_t size>
-  size_t send(const T (&array)[size]) {
-    return send(reinterpret_cast<const char *>(array), sizeof(T) * size);
+  size_t send(const T (&array)[size], std::chrono::milliseconds timeout) {
+    return send(reinterpret_cast<const char *>(array), sizeof(T) * size,
+                timeout);
   }
 
   /**
@@ -114,8 +117,8 @@ class APACHE_GEODE_EXPORT Connector {
    * @exception GeodeIOException, TimeoutException
    */
   template <typename T, size_t size>
-  size_t receive(T (&array)[size]) {
-    return receive(reinterpret_cast<char *>(array), sizeof(T) * size);
+  size_t receive(T (&array)[size], std::chrono::milliseconds timeout) {
+    return receive(reinterpret_cast<char *>(array), sizeof(T) * size, timeout);
   }
 };
 }  // namespace client
diff --git a/cppcache/src/DistributedSystem.hpp 
b/cppcache/src/DistributedSystem.hpp
index a03ea6c..3b080b9 100644
--- a/cppcache/src/DistributedSystem.hpp
+++ b/cppcache/src/DistributedSystem.hpp
@@ -48,7 +48,6 @@ namespace client {
 class SystemProperties;
 class DistributedSystemImpl;
 class CacheRegionHelper;
-class TcrConnection;
 
 class APACHE_GEODE_EXPORT DistributedSystem {
   /**
@@ -108,7 +107,6 @@ class APACHE_GEODE_EXPORT DistributedSystem {
 
   friend class CacheRegionHelper;
   friend class DistributedSystemImpl;
-  friend class TcrConnection;
 };
 }  // namespace client
 }  // namespace geode
diff --git a/cppcache/src/TcpConn.cpp b/cppcache/src/TcpConn.cpp
index 86f9a32..6ae116b 100644
--- a/cppcache/src/TcpConn.cpp
+++ b/cppcache/src/TcpConn.cpp
@@ -20,6 +20,7 @@
 #include <iomanip>
 #include <iostream>
 
+#include <boost/optional.hpp>
 #include <boost/system/error_code.hpp>
 #include <boost/system/system_error.hpp>
 
@@ -109,12 +110,51 @@ TcpConn::TcpConn(const std::string ipaddr,
           connect_timeout, maxBuffSizePool} {}
 
 TcpConn::TcpConn(const std::string host, uint16_t port,
-                 std::chrono::microseconds /*connect_timeout*/,
-                 int32_t maxBuffSizePool)
+                 std::chrono::microseconds timeout, int32_t maxBuffSizePool)
     : socket_{io_context_} {
-  // We must connect first so we have a valid file descriptor to set options 
on.
-  boost::asio::connect(socket_, boost::asio::ip::tcp::resolver(io_context_)
-                                    .resolve(host, std::to_string(port)));
+  boost::optional<boost::system::error_code> connect_result, timer_result;
+  boost::asio::deadline_timer connect_deadline{io_context_};
+
+  try {
+    // We must connect first so we have a valid file descriptor to set options
+    // on.
+    boost::asio::async_connect(
+        socket_,
+        boost::asio::ip::tcp::resolver(io_context_)
+            .resolve(host, std::to_string(port)),
+        [&connect_result](const boost::system::error_code &ec,
+                          const boost::asio::ip::tcp::endpoint) -> bool {
+          connect_result.reset(ec);
+          return true;
+        });
+
+    connect_deadline.expires_from_now(
+        boost::posix_time::milliseconds(timeout.count()));
+    connect_deadline.async_wait(
+        [&timer_result](const boost::system::error_code &ec) {
+          if (ec) {
+            timer_result.reset(ec);
+          }
+        });
+
+    io_context_.reset();
+    while (io_context_.run_one()) {
+      if (timer_result) {
+        socket_.cancel();
+      }
+      if (connect_result) {
+        connect_deadline.cancel();
+      }
+    }
+  } catch (...) {
+    std::cout << "Throwing an unexpected connect exception\n";
+    throw;
+  }
+
+  if (connect_result && *connect_result) {
+    std::cout << "Throwing a connect exception\n";
+    throw *connect_result;
+  }
 
   std::stringstream ss;
   ss << "Connected " << socket_.local_endpoint() << " -> "
@@ -161,50 +201,182 @@ TcpConn::TcpConn(const std::string ipaddr,
 
 TcpConn::~TcpConn() {
   std::stringstream ss;
-  ss << "Disconnected " << socket_.local_endpoint() << " -> "
-     << socket_.remote_endpoint();
+
+  try {
+    ss << "Disconnected " << socket_.local_endpoint() << " -> "
+       << socket_.remote_endpoint();
+
+    socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
+
+  } catch (...) {
+    ss = std::stringstream{};
+
+    ss << "Closed socket " << socket_.local_endpoint();
+  }
+
+  socket_.close();
+
   LOGFINE(ss.str());
 }
 
-size_t TcpConn::receive(char *buff, size_t len) {
-  auto start = std::chrono::system_clock::now();
-
-  return boost::asio::read(socket_, boost::asio::buffer(buff, len),
-                           [len, start](boost::system::error_code &ec,
-                                        const std::size_t n) -> std::size_t {
-                             if (ec && ec != boost::asio::error::eof) {
-                               // Quit if we encounter an error.
-                               // Defer EOF to timeout.
-                               return 0;
-                             } else if (start + std::chrono::milliseconds(25) 
<=
-                                        std::chrono::system_clock::now()) {
-                               // Sometimes we don't know how much data to
-                               // expect, so we're reading into an oversized
-                               // buffer without knowing when to quit other 
than
-                               // by timeout. Typically, if we timeout, we also
-                               // have an EOF, meaning the connection is likely
-                               // broken and will have to be closed. But if we
-                               // have bytes, we may have just done a
-                               // dumb/blind/hail mary receive, so defer broken
-                               // connection handling until the next IO
-                               // operation.
-                               if (n) {
-                                 // This prevents the timeout from being an
-                                 // error condition.
-                                 ec = boost::system::error_code{};
-                               }
-                               // But if n == 0 when we timeout, it's just a
-                               // broken connection.
-
-                               return 0;
-                             }
-
-                             return len - n;
-                           });
+size_t TcpConn::receive(char *buff, const size_t len,
+                        std::chrono::milliseconds timeout) {
+  std::stringstream ss;
+  ss << "Receiving " << len << " bytes from " << socket_.remote_endpoint()
+     << " -> " << socket_.local_endpoint();
+  LOGDEBUG(ss.str());
+
+  boost::optional<boost::system::error_code> timer_result, read_result;
+  std::size_t bytes_read = 0;
+
+  try {
+    // Here we prep the Asio subsystem for a read operation with the completion
+    // condition below.
+    boost::asio::async_read(
+        socket_, boost::asio::buffer(buff, len),
+        [&read_result, &bytes_read, len](const boost::system::error_code &ec,
+                                         const size_t n) -> size_t {
+          bytes_read = n;
+
+          // Aborts come from timeouts or manual interrupts, as seen below in
+          // the while loop. If we timeout and haven't read anything, the
+          // connection is probably broken. A broken pipe is indicated by an
+          // EOF.
+          if (ec == boost::asio::error::operation_aborted && 0 == n) {
+            read_result.reset(
+                boost::system::error_code{boost::asio::error::eof});
+            return 0;
+          }
+          // If we timeout and there are bytes read, that isn't necessarily an
+          // error; Asio presumes it's meant to fill a fixed size buffer
+          // exactly. The buffer may simply be too big for an expected response
+          // but of an unknown size.
+          //
+          // EOF itself occurs when there is no data available on the socket at
+          // the time of the read. It may simply imply data has yet to arrive.
+          // Do nothing. Defer to timeout rather than assume a broken
+          // connection.
+          //
+          // For every other error condition, including a timeout with data,
+          // complete the operation.
+          else if (ec && ec != boost::asio::error::eof &&
+                   ec != boost::asio::error::try_again) {
+            read_result.reset(ec);
+            return 0;
+          }
+          // Once the buffer is filled, indicate success, regardless the error
+          // condition on the socket. Defer to the next receive operation to
+          // handle that eventuality.
+          else if (n == len) {
+            read_result.reset(boost::system::error_code{});
+            return 0;
+          }
+
+          // As the last read was successful, continue filling the fixed size
+          // buffer.
+          return len - n;
+        });
+
+    // This timer will abort the operation after the timeout period, and that
+    // will be indicated within the completion handler above.
+    boost::asio::deadline_timer read_deadline{io_context_};
+    read_deadline.expires_from_now(
+        boost::posix_time::milliseconds(timeout.count()));
+    read_deadline.async_wait(
+        [&timer_result](const boost::system::error_code &ec) {
+          if (ec) {
+            timer_result.reset(ec);
+          }
+        });
+
+    // Run until the context enters the stopped state.
+    io_context_.reset();
+    while (io_context_.run_one()) {
+      // If something went wrong with the timer, abort the read.
+      // This will result in an aborted read result.
+      if (timer_result) {
+        socket_.cancel();
+      }
+      if (read_result) {
+        read_deadline.cancel();
+      }
+    }
+  } catch (...) {
+    std::cout << "Throwing an unexpected read exception\n";
+    throw;
+  }
+
+  if (read_result && *read_result) {
+    std::cout << "Throwing a read exception\n";
+    throw *read_result;
+  }
+
+  return bytes_read;
 }
 
-size_t TcpConn::send(const char *buff, size_t len) {
-  return boost::asio::write(socket_, boost::asio::buffer(buff, len));
+size_t TcpConn::send(const char *buff, const size_t len,
+                     std::chrono::milliseconds timeout) {
+  std::stringstream ss;
+  ss << "Sending " << len << " bytes from " << socket_.local_endpoint()
+     << " -> " << socket_.remote_endpoint();
+  LOGDEBUG(ss.str());
+
+  boost::optional<boost::system::error_code> timer_result, write_result;
+  std::size_t bytes_written = 0;
+
+  try {
+    boost::asio::async_write(
+        socket_, boost::asio::buffer(buff, len),
+        [&write_result, &bytes_written, len](
+            const boost::system::error_code &ec, const size_t n) -> size_t {
+          bytes_written = n;
+
+          if (ec == boost::asio::error::operation_aborted && 0 == n) {
+            write_result.reset(
+                boost::system::error_code{boost::asio::error::eof});
+            return 0;
+          } else if (ec && ec != boost::asio::error::eof &&
+                     ec != boost::asio::error::try_again) {
+            write_result.reset(ec);
+            return 0;
+          } else if (n == len) {
+            write_result.reset(boost::system::error_code{});
+            return 0;
+          }
+
+          return len - n;
+        });
+
+    boost::asio::deadline_timer write_deadline{io_context_};
+    write_deadline.expires_from_now(
+        boost::posix_time::milliseconds(timeout.count()));
+    write_deadline.async_wait(
+        [&timer_result](const boost::system::error_code &ec) {
+          if (ec) {
+            timer_result.reset(ec);
+          }
+        });
+
+    io_context_.reset();
+    while (io_context_.run_one()) {
+      if (timer_result) {
+        socket_.cancel();
+      }
+      if (write_result) {
+        write_deadline.cancel();
+      }
+    }
+  } catch (...) {
+    std::cout << "Throwing an unexpected write exception\n";
+    throw;
+  }
+
+  if (write_result && *write_result) {
+    std::cout << "Throwing a write exception\n";
+    throw *write_result;
+  }
+
+  return bytes_written;
 }
 
 //  Return the local port for this TCP connection.
diff --git a/cppcache/src/TcpConn.hpp b/cppcache/src/TcpConn.hpp
index fef274a..50f6552 100644
--- a/cppcache/src/TcpConn.hpp
+++ b/cppcache/src/TcpConn.hpp
@@ -30,8 +30,8 @@ namespace apache {
 namespace geode {
 namespace client {
 class APACHE_GEODE_EXPORT TcpConn : public Connector {
-  size_t receive(char* buff, size_t len) override;
-  size_t send(const char* buff, size_t len) override;
+  size_t receive(char*, size_t, std::chrono::milliseconds) override;
+  size_t send(const char*, size_t, std::chrono::milliseconds) override;
 
   uint16_t getPort() override final;
 
diff --git a/cppcache/src/TcpSslConn.cpp b/cppcache/src/TcpSslConn.cpp
index 608484b..2e1d035 100644
--- a/cppcache/src/TcpSslConn.cpp
+++ b/cppcache/src/TcpSslConn.cpp
@@ -91,7 +91,8 @@ TcpSslConn::~TcpSslConn() {
   LOGFINE(ss.str());
 }
 
-size_t TcpSslConn::receive(char *buff, size_t len) {
+size_t TcpSslConn::receive(char *buff, const size_t len,
+                           std::chrono::milliseconds) {
   auto start = std::chrono::system_clock::now();
 
   return boost::asio::read(*socket_stream_, boost::asio::buffer(buff, len),
@@ -128,7 +129,8 @@ size_t TcpSslConn::receive(char *buff, size_t len) {
                            });
 }
 
-size_t TcpSslConn::send(const char *buff, size_t len) {
+size_t TcpSslConn::send(const char *buff, const size_t len,
+                        std::chrono::milliseconds) {
   return boost::asio::write(*socket_stream_, boost::asio::buffer(buff, len));
 }
 
diff --git a/cppcache/src/TcpSslConn.hpp b/cppcache/src/TcpSslConn.hpp
index d5ca292..e2ddf0b 100644
--- a/cppcache/src/TcpSslConn.hpp
+++ b/cppcache/src/TcpSslConn.hpp
@@ -29,8 +29,8 @@ namespace geode {
 namespace client {
 
 class TcpSslConn : public TcpConn {
-  size_t receive(char* buff, size_t len) override final;
-  size_t send(const char* buff, size_t len) override final;
+  size_t receive(char*, size_t, std::chrono::milliseconds) override final;
+  size_t send(const char*, size_t, std::chrono::milliseconds) override final;
 
   using ssl_stream_type =
       boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>;
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 14bda1a..7d098ba 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -34,34 +34,85 @@
 #include "Utils.hpp"
 #include "Version.hpp"
 
-namespace apache {
-namespace geode {
-namespace client {
-
-const int HEADER_LENGTH = 17;
-const int CHUNK_HEADER_LENGTH = 5;
-const int8_t LAST_CHUNK_MASK = 0x1;
-const int64_t INITIAL_CONNECTION_ID = 26739;
-
 #define throwException(ex)                            \
   do {                                                \
     LOGFINEST(ex.getName() + ": " + ex.getMessage()); \
     throw ex;                                         \
   } while (0)
 
+namespace {
+bool useReplyTimeout(const apache::geode::client::TcrMessage& request) {
+  switch (request.getMessageType()) {
+    case apache::geode::client::TcrMessage::QUERY:
+    case apache::geode::client::TcrMessage::QUERY_WITH_PARAMETERS:
+    case apache::geode::client::TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE:
+    case apache::geode::client::TcrMessage::GETDURABLECQS_MSG_TYPE:
+    case apache::geode::client::TcrMessage::EXECUTE_FUNCTION:
+    case apache::geode::client::TcrMessage::EXECUTE_REGION_FUNCTION:
+    case apache::geode::client::TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP:
+      return true;
+    default:
+      break;
+  }
+
+  return false;
+}
+
+int expiryTimeVariancePercentage() {
+  auto nowTimePoint = std::chrono::steady_clock::now().time_since_epoch();
+  auto now_ms =
+      std::chrono::duration_cast<std::chrono::milliseconds>(nowTimePoint)
+          .count();
+  auto now_s =
+      std::chrono::duration_cast<std::chrono::seconds>(nowTimePoint).count();
+
+  srand(static_cast<unsigned int>((now_s * 1000) + (now_ms / 1000)));
+
+  const int numbers = 21;
+  int random = rand() % numbers + 1;
+
+  if (random > 10) {
+    random = random - numbers;
+  }
+  return random;
+}
+
+const int HEADER_LENGTH = 17;
+const int CHUNK_HEADER_LENGTH = 5;
+const int8_t LAST_CHUNK_MASK = 0x1;
+const int64_t INITIAL_CONNECTION_ID = 26739;
+
 struct FinalizeProcessChunk {
  private:
-  TcrMessage& m_reply;
+  apache::geode::client::TcrMessage& m_reply;
   uint16_t m_endpointMemId;
 
  public:
-  FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointMemId)
+  FinalizeProcessChunk(apache::geode::client::TcrMessageReply& reply,
+                       uint16_t endpointMemId)
       : m_reply(reply), m_endpointMemId(endpointMemId) {}
   ~FinalizeProcessChunk() noexcept(false) {
     // Enqueue a nullptr chunk indicating a wait for processing to complete.
     m_reply.processChunk(std::vector<uint8_t>(), 0, m_endpointMemId);
   }
 };
+}  // namespace
+
+namespace apache {
+namespace geode {
+namespace client {
+
+TcrConnection::TcrConnection(const TcrConnectionManager& connectionManager)
+    : connectionId(0),
+      m_connectionManager(connectionManager),
+      m_expiryTimeVariancePercentage{expiryTimeVariancePercentage()},
+      m_hasServerQueue(NON_REDUNDANT_SERVER),
+      m_queueSize(0),
+      m_port(0),
+      m_chunksProcessSema(0),
+      m_isBeingUsed(false),
+      m_isUsed(0),
+      m_poolDM(nullptr) {}
 
 bool TcrConnection::initTcrConnection(
     std::shared_ptr<TcrEndpoint> endpointObj,
@@ -72,9 +123,8 @@ bool TcrConnection::initTcrConnection(
   m_poolDM = dynamic_cast<ThinClientPoolDM*>(m_endpointObj->getPoolHADM());
   m_hasServerQueue = NON_REDUNDANT_SERVER;
   m_queueSize = 0;
-  m_creationTime = clock::now();
+  m_lastAccessed = m_creationTime = std::chrono::steady_clock::now();
   connectionId = INITIAL_CONNECTION_ID;
-  m_lastAccessed = clock::now();
   auto cacheImpl = m_poolDM->getConnectionManager().getCacheImpl();
   const auto& distributedSystem = cacheImpl->getDistributedSystem();
   const auto& sysProp = distributedSystem.getSystemProperties();
@@ -246,7 +296,7 @@ bool TcrConnection::initTcrConnection(
           endpointObj->name().c_str(),
           isClientNotification ? (isSecondary ? "secondary " : "primary ") : 
"",
           isClientNotification ? "subscription" : "client");
-  ConnErrType error = sendData(data, msgLength, connectTimeout, false);
+  ConnErrType error = sendData(data, msgLength, connectTimeout);
 
   if (error == CONN_NOERR) {
     auto acceptanceCode = readHandshakeData(1, connectTimeout);
@@ -290,12 +340,10 @@ bool TcrConnection::initTcrConnection(
     ///////////////////////////////////
     ////////////////////////// 3. Only when handshake is for subscription
     ///////////////////////////////////
-    if (m_poolDM != nullptr) {
-      if ((m_hasServerQueue == PRIMARY_SERVER ||
-           m_hasServerQueue == NON_REDUNDANT_SERVER) &&
-          isClientNotification) {
-        m_poolDM->setPrimaryServerQueueSize(queueSize);
-      }
+    if ((m_hasServerQueue == PRIMARY_SERVER ||
+         m_hasServerQueue == NON_REDUNDANT_SERVER) &&
+        isClientNotification) {
+      m_poolDM->setPrimaryServerQueueSize(queueSize);
     }
 
     if (!isClientNotification) {
@@ -317,7 +365,7 @@ bool TcrConnection::initTcrConnection(
     }
 
     auto recvMsgLenBytes = readHandshakeData(2, connectTimeout);
-    auto dataInput3 = m_connectionManager->getCacheImpl()->createDataInput(
+    auto dataInput3 = m_connectionManager.getCacheImpl()->createDataInput(
         reinterpret_cast<const uint8_t*>(recvMsgLenBytes.data()),
         recvMsgLenBytes.size());
     uint16_t recvMsgLen2 = dataInput3.readInt16();
@@ -325,7 +373,7 @@ bool TcrConnection::initTcrConnection(
 
     if (!isClientNotification) {
       auto deltaEnabledMsg = readHandshakeData(1, connectTimeout);
-      auto di = m_connectionManager->getCacheImpl()->createDataInput(
+      auto di = m_connectionManager.getCacheImpl()->createDataInput(
           reinterpret_cast<const uint8_t*>(deltaEnabledMsg.data()), 1);
       ThinClientBaseDM::setDeltaEnabledOnServer(di.readBoolean());
     }
@@ -415,7 +463,7 @@ bool TcrConnection::initTcrConnection(
 void TcrConnection::createConnection(const char* endpoint,
                                      std::chrono::microseconds connectTimeout,
                                      int32_t maxBuffSizePool) {
-  auto& systemProperties = m_connectionManager->getCacheImpl()
+  auto& systemProperties = m_connectionManager.getCacheImpl()
                                ->getDistributedSystem()
                                .getSystemProperties();
 
@@ -429,94 +477,47 @@ void TcrConnection::createConnection(const char* endpoint,
   }
 }
 
-/* The timeout behaviour for different methods is as follows:
- * receive():
- *   Header: default timeout
- *   Body: default timeout
- * sendRequest()/sendRequestForChunkedResponse():
- *  default timeout during send; for receive:
- *   Header: default timeout * default timeout retries to handle large payload
- *           if a timeout other than default timeout is specified then
- *           that is used instead
- *   Body: default timeout
- */
 ConnErrType TcrConnection::receiveData(
-    char* buffer, size_t length, std::chrono::microseconds operation_window,
-    bool /*checkConnected*/, bool isNotificationMessage) {
-  auto startLen = length;
-
-  while (length > 0 && operation_window > decltype(operation_window)::zero()) {
-    const auto start = std::chrono::system_clock::now();
-
-    std::size_t readBytes = 0;
-
-    try {
-      readBytes = m_conn->receive(buffer, length);
-    } catch (boost::system::system_error& ex) {
-      switch (ex.code().value()) {
-        case boost::asio::error::eof:
-          LOGDEBUG("Connection:receive: \"%s\"", ex.what());
-        case boost::asio::error::try_again:
-          break;
-        default: {
-          LOGERROR("Connection:receive: \"%s\"", ex.what());
-          return CONN_IOERR;
-        }
-      }
-    }
-
-    length -= readBytes;
-    buffer += readBytes;
-
-    if (m_poolDM != nullptr) {
-      m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes));
-    }
-
-    if ((length == startLen) && isNotificationMessage) {  // no data read
-      break;
+    char* buffer, const size_t length,
+    const std::chrono::microseconds timeout) {
+  try {
+    const auto readBytes = m_conn->receive(
+        buffer, length,
+        std::chrono::duration_cast<std::chrono::milliseconds>(timeout));
+
+    m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes));
+  } catch (boost::system::system_error& ex) {
+    switch (ex.code().value()) {
+      case boost::asio::error::eof:
+        return CONN_NODATA;
+      case boost::asio::error::operation_aborted:
+        return CONN_TIMEOUT;
+      default:
+        break;
     }
-
-    operation_window -= std::chrono::duration_cast<decltype(operation_window)>(
-        std::chrono::system_clock::now() - start);
-  }
-  //  Postconditions for checking bounds.
-  if (length == 0) {
-    return CONN_NOERR;
-  } else if (length == startLen) {
-    return CONN_NODATA;
+    return CONN_IOERR;
   }
 
-  return CONN_TIMEOUT;
+  return CONN_NOERR;
 }
 
 ConnErrType TcrConnection::sendData(const char* buffer, size_t length,
-                                    std::chrono::microseconds operation_window,
-                                    bool /*checkConnected*/) {
-  while (length > 0 && operation_window > std::chrono::microseconds::zero()) {
-    const auto start = std::chrono::system_clock::now();
-
-    std::size_t sentBytes = 0;
-    try {
-      sentBytes = m_conn->send(buffer, length);
-    } catch (boost::system::system_error& ex) {
-      switch (ex.code().value()) {
-        case boost::asio::error::try_again:
-          break;
-        default: {
-          LOGERROR("Connection:send: \"%s\"", ex.what());
-          return CONN_IOERR;
-        }
-      }
+                                    std::chrono::microseconds timeout) {
+  try {
+    m_conn->send(
+        buffer, length,
+        std::chrono::duration_cast<std::chrono::milliseconds>(timeout));
+  } catch (boost::system::system_error& ex) {
+    switch (ex.code().value()) {
+      case boost::asio::error::operation_aborted:
+        return CONN_TIMEOUT;
+      default:
+        break;
     }
-
-    length -= sentBytes;
-    buffer += sentBytes;
-
-    operation_window -= std::chrono::duration_cast<decltype(operation_window)>(
-        std::chrono::system_clock::now() - start);
+    return CONN_IOERR;
   }
 
-  return length == 0 ? CONN_NOERR : CONN_TIMEOUT;
+  return CONN_NOERR;
 }
 
 char* TcrConnection::sendRequest(const char* buffer, size_t len,
@@ -559,17 +560,6 @@ void TcrConnection::sendRequestForChunkedResponse(
   }
 }
 
-bool TcrConnection::useReplyTimeout(const TcrMessage& request) const {
-  auto messageType = request.getMessageType();
-  return ((messageType == TcrMessage::QUERY) ||
-          (messageType == TcrMessage::QUERY_WITH_PARAMETERS) ||
-          (messageType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) ||
-          (messageType == TcrMessage::GETDURABLECQS_MSG_TYPE) ||
-          (messageType == TcrMessage::EXECUTE_FUNCTION) ||
-          (messageType == TcrMessage::EXECUTE_REGION_FUNCTION) ||
-          (messageType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP));
-}
-
 std::chrono::microseconds TcrConnection::sendWithTimeouts(
     const char* data, size_t len, std::chrono::microseconds sendTimeout,
     std::chrono::microseconds receiveTimeout) {
@@ -608,29 +598,20 @@ bool TcrConnection::replyHasResult(const TcrMessage& 
request,
 
 void TcrConnection::send(const char* buffer, size_t len,
                          std::chrono::microseconds sendTimeoutSec, bool) {
-  // LOGINFO("TcrConnection::send: [%p] sending request to endpoint %s;",
-  //:  this, m_endpointObj->name().c_str());
-
   LOGDEBUG(
       "TcrConnection::send: [%p] sending request to endpoint %s; bytes: %s",
       this, m_endpointObj->name().c_str(),
       Utils::convertBytesToString(buffer, len).c_str());
 
-  ConnErrType error = sendData(buffer, len, sendTimeoutSec);
-
-  LOGFINER(
-      "TcrConnection::send: completed send request to endpoint %s "
-      "with error: %d",
-      m_endpointObj->name().c_str(), error);
-
-  if (error != CONN_NOERR) {
-    if (error == CONN_TIMEOUT) {
+  switch (sendData(buffer, len, sendTimeoutSec)) {
+    case CONN_NOERR:
+      break;
+    case CONN_TIMEOUT:
       throwException(
           TimeoutException("TcrConnection::send: connection timed out"));
-    } else {
+    default:
       throwException(
           GeodeIOException("TcrConnection::send: connection failure"));
-    }
   }
 }
 
@@ -653,8 +634,7 @@ char* TcrConnection::readMessage(size_t* recvLen,
     headerTimeout = DEFAULT_READ_TIMEOUT * DEFAULT_TIMEOUT_RETRIES;
   }
 
-  error = receiveData(msg_header, HEADER_LENGTH, headerTimeout, true,
-                      isNotificationMessage);
+  error = receiveData(msg_header, HEADER_LENGTH, headerTimeout);
 
   if (error != CONN_NOERR) {
     //  the !isNotificationMessage ensures that notification channel
@@ -689,7 +669,7 @@ char* TcrConnection::readMessage(size_t* recvLen,
       this, m_endpointObj->name().c_str(),
       Utils::convertBytesToString(msg_header, HEADER_LENGTH).c_str());
 
-  auto input = m_connectionManager->getCacheImpl()->createDataInput(
+  auto input = m_connectionManager.getCacheImpl()->createDataInput(
       reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH);
   // ignore msgType
   input.readInt32();
@@ -714,8 +694,7 @@ char* TcrConnection::readMessage(size_t* recvLen,
   if (isNotificationMessage) {
     mesgBodyTimeout = receiveTimeoutSec * DEFAULT_TIMEOUT_RETRIES;
   }
-  error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout,
-                      true, isNotificationMessage);
+  error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout);
   if (error != CONN_NOERR) {
     delete[] fullMessage;
     //  the !isNotificationMessage ensures that notification channel
@@ -808,7 +787,7 @@ chunkedResponseHeader TcrConnection::readResponseHeader(
   chunkedResponseHeader header;
 
   auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
-                           HEADER_LENGTH, timeout, true, false);
+                           HEADER_LENGTH, timeout);
   if (error != CONN_NOERR) {
     if (error & CONN_TIMEOUT) {
       throwException(TimeoutException(
@@ -827,7 +806,7 @@ chunkedResponseHeader TcrConnection::readResponseHeader(
       m_endpointObj->name().c_str(),
       Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str());
 
-  auto input = m_connectionManager->getCacheImpl()->createDataInput(
+  auto input = m_connectionManager.getCacheImpl()->createDataInput(
       receiveBuffer, HEADER_LENGTH);
   header.messageType = input.readInt32();
   header.numberOfParts = input.readInt32();
@@ -850,7 +829,7 @@ chunkHeader 
TcrConnection::readChunkHeader(std::chrono::microseconds timeout) {
   chunkHeader header;
 
   auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
-                           CHUNK_HEADER_LENGTH, timeout, true, false);
+                           CHUNK_HEADER_LENGTH, timeout);
   if (error != CONN_NOERR) {
     if (error & CONN_TIMEOUT) {
       throwException(TimeoutException(
@@ -869,7 +848,7 @@ chunkHeader 
TcrConnection::readChunkHeader(std::chrono::microseconds timeout) {
       m_endpointObj->name().c_str(),
       Utils::convertBytesToString(receiveBuffer, CHUNK_HEADER_LENGTH).c_str());
 
-  auto input = m_connectionManager->getCacheImpl()->createDataInput(
+  auto input = m_connectionManager.getCacheImpl()->createDataInput(
       receiveBuffer, CHUNK_HEADER_LENGTH);
   header.chunkLength = input.readInt32();
   header.flags = input.read();
@@ -885,7 +864,7 @@ std::vector<uint8_t> TcrConnection::readChunkBody(
     std::chrono::microseconds timeout, int32_t chunkLength) {
   std::vector<uint8_t> chunkBody(chunkLength);
   auto error = receiveData(reinterpret_cast<char*>(chunkBody.data()),
-                           chunkLength, timeout, true, false);
+                           chunkLength, timeout);
   if (error != CONN_NOERR) {
     if (error & CONN_TIMEOUT) {
       throwException(
@@ -930,7 +909,7 @@ void TcrConnection::close() {
       m_poolDM->getConnectionManager().getCacheImpl());
   try {
     if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH &&
-        !m_connectionManager->isNetDown()) {
+        !m_connectionManager.isNetDown()) {
       send(closeMsg->getMsgData(), closeMsg->getMsgLength(),
            std::chrono::seconds(2), false);
     }
@@ -953,7 +932,7 @@ std::vector<int8_t> TcrConnection::readHandshakeData(
     return message;
   }
   if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength,
-                           connectTimeout, false)) != CONN_NOERR) {
+                           connectTimeout)) != CONN_NOERR) {
     m_conn.reset();
     if (error & CONN_TIMEOUT) {
       throwException(
@@ -980,7 +959,7 @@ std::shared_ptr<CacheableBytes> 
TcrConnection::readHandshakeRawData(
   }
   std::vector<int8_t> message(msgLength);
   if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength,
-                           connectTimeout, false)) != CONN_NOERR) {
+                           connectTimeout)) != CONN_NOERR) {
     m_conn.reset();
     if (error & CONN_TIMEOUT) {
       throwException(
@@ -1010,13 +989,13 @@ int32_t TcrConnection::readHandshakeArraySize(
   int32_t arrayLength = static_cast<uint8_t>(arrayLenHeader[0]);
   if (static_cast<int8_t>(arrayLenHeader[0]) == -2) {
     auto arrayLengthBytes = readHandshakeData(2, connectTimeout);
-    auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput(
+    auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput(
         reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()),
         arrayLengthBytes.size());
     arrayLength = dataInput2.readInt16();
   } else if (static_cast<int8_t>(arrayLenHeader[0]) == -3) {
     auto arrayLengthBytes = readHandshakeData(4, connectTimeout);
-    auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput(
+    auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput(
         reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()),
         arrayLengthBytes.size());
     arrayLength = dataInput2.readInt32();
@@ -1061,7 +1040,7 @@ void TcrConnection::readHandShakeBytes(
   _GEODE_NEW(recvMessage, uint8_t[numberOfBytes]);
 
   if ((error = receiveData(reinterpret_cast<char*>(recvMessage), numberOfBytes,
-                           connectTimeout, false)) != CONN_NOERR) {
+                           connectTimeout)) != CONN_NOERR) {
     if (error & CONN_TIMEOUT) {
       _GEODE_SAFE_DELETE_ARRAY(recvMessage);
       m_conn.reset();
@@ -1087,7 +1066,7 @@ int32_t TcrConnection::readHandShakeInt(
   _GEODE_NEW(recvMessage, uint8_t[4]);
 
   if ((error = receiveData(reinterpret_cast<char*>(recvMessage), 4,
-                           connectTimeout, false)) != CONN_NOERR) {
+                           connectTimeout)) != CONN_NOERR) {
     if (error & CONN_TIMEOUT) {
       _GEODE_SAFE_DELETE_ARRAY(recvMessage);
       m_conn.reset();
@@ -1103,8 +1082,7 @@ int32_t TcrConnection::readHandShakeInt(
     }
   }
 
-  auto di =
-      m_connectionManager->getCacheImpl()->createDataInput(recvMessage, 4);
+  auto di = m_connectionManager.getCacheImpl()->createDataInput(recvMessage, 
4);
   int32_t val = di.readInt32();
 
   _GEODE_SAFE_DELETE_ARRAY(recvMessage);
@@ -1117,7 +1095,7 @@ std::shared_ptr<CacheableString> 
TcrConnection::readHandshakeString(
   ConnErrType error = CONN_NOERR;
 
   char cstypeid;
-  if (receiveData(&cstypeid, 1, connectTimeout, false) != CONN_NOERR) {
+  if (receiveData(&cstypeid, 1, connectTimeout) != CONN_NOERR) {
     m_conn.reset();
     if (error & CONN_TIMEOUT) {
       LOGFINE("Timeout receiving string typeid");
@@ -1141,7 +1119,7 @@ std::shared_ptr<CacheableString> 
TcrConnection::readHandshakeString(
     }
     case DSCode::CacheableASCIIString: {
       auto lenBytes = readHandshakeData(2, connectTimeout);
-      auto lenDI = m_connectionManager->getCacheImpl()->createDataInput(
+      auto lenDI = m_connectionManager.getCacheImpl()->createDataInput(
           reinterpret_cast<const uint8_t*>(lenBytes.data()), lenBytes.size());
       length = lenDI.readInt16();
 
@@ -1164,8 +1142,8 @@ std::shared_ptr<CacheableString> 
TcrConnection::readHandshakeString(
   std::vector<char> recvMessage(length + 1);
   recvMessage[length] = '\0';
 
-  if ((error = receiveData(recvMessage.data(), length, connectTimeout,
-                           false)) != CONN_NOERR) {
+  if ((error = receiveData(recvMessage.data(), length, connectTimeout)) !=
+      CONN_NOERR) {
     if (error & CONN_TIMEOUT) {
       m_conn.reset();
       LOGFINE("Timeout receiving string data");
@@ -1192,7 +1170,8 @@ bool TcrConnection::hasExpired(const 
std::chrono::milliseconds& expiryTime) {
   }
   auto variadicExpiryTime =
       expiryTime + (expiryTime * m_expiryTimeVariancePercentage) / 100;
-  return (clock::now() - m_creationTime) > variadicExpiryTime;
+  return (std::chrono::steady_clock::now() - m_creationTime) >
+         variadicExpiryTime;
 }
 
 bool TcrConnection::isIdle(const std::chrono::milliseconds& idleTime) {
@@ -1200,12 +1179,14 @@ bool TcrConnection::isIdle(const 
std::chrono::milliseconds& idleTime) {
     return false;
   }
 
-  return (clock::now() - m_lastAccessed) > idleTime;
+  return (std::chrono::steady_clock::now() - m_lastAccessed) > idleTime;
 }
 
-void TcrConnection::touch() { m_lastAccessed = clock::now(); }
+void TcrConnection::touch() {
+  m_lastAccessed = std::chrono::steady_clock::now();
+}
 
-TcrConnection::time_point TcrConnection::getLastAccessed() {
+std::chrono::steady_clock::time_point TcrConnection::getLastAccessed() {
   return m_lastAccessed;
 }
 
@@ -1223,7 +1204,7 @@ uint8_t TcrConnection::getOverrides(const 
SystemProperties* props) {
 }
 
 void TcrConnection::updateCreationTime() {
-  m_creationTime = clock::now();
+  m_creationTime = std::chrono::steady_clock::now();
   touch();
 }
 
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index d3c1a7f..8ed9398 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -85,9 +85,6 @@ class ThinClientPoolDM;
 class TcrConnectionManager;
 class TcrConnection {
  public:
-  using clock = std::chrono::steady_clock;
-  using time_point = clock::time_point;
-
   /** Create one connection, endpoint is in format of hostname:portno
    * It will do handshake with j-server. There're 2 types of handshakes:
    * 1) handshake for request
@@ -118,36 +115,7 @@ class TcrConnection {
       bool isClientNotification = false, bool isSecondary = false,
       std::chrono::microseconds connectTimeout = DEFAULT_CONNECT_TIMEOUT);
 
-  TcrConnection(const TcrConnectionManager& connectionManager,
-                volatile const bool& isConnected)
-      : connectionId(0),
-        m_connectionManager(&connectionManager),
-        m_connected(isConnected),
-        m_conn(nullptr),
-        m_hasServerQueue(NON_REDUNDANT_SERVER),
-        m_queueSize(0),
-        m_port(0),
-        m_chunksProcessSema(0),
-        m_isBeingUsed(false),
-        m_isUsed(0),
-        m_poolDM(nullptr) {
-    auto nowTimePoint = clock::now().time_since_epoch();
-    auto now_ms =
-        std::chrono::duration_cast<std::chrono::milliseconds>(nowTimePoint)
-            .count();
-    auto now_s =
-        std::chrono::duration_cast<std::chrono::seconds>(nowTimePoint).count();
-    auto seed = (now_s * 1000) + (now_ms / 1000);
-    srand(static_cast<unsigned int>(seed));
-    int numbers = 21;
-    int random = rand() % numbers + 1;
-    if (random > 10) {
-      random = random - numbers;
-    }
-    m_expiryTimeVariancePercentage = random;
-    LOGDEBUG("m_expiryTimeVariancePercentage set to: %d",
-             m_expiryTimeVariancePercentage);
-  }
+  TcrConnection(const TcrConnectionManager& connectionManager);
 
   /* destroy the connection */
   ~TcrConnection();
@@ -287,7 +255,7 @@ class TcrConnection {
   void touch();
   bool hasExpired(const std::chrono::milliseconds& expiryTime);
   bool isIdle(const std::chrono::milliseconds& idleTime);
-  time_point getLastAccessed();
+  std::chrono::steady_clock::time_point getLastAccessed();
   void updateCreationTime();
 
   int64_t getConnectionId() {
@@ -301,12 +269,12 @@ class TcrConnection {
   }
 
   const TcrConnectionManager& getConnectionManager() {
-    return *m_connectionManager;
+    return m_connectionManager;
   }
 
  private:
   int64_t connectionId;
-  const TcrConnectionManager* m_connectionManager;
+  const TcrConnectionManager& m_connectionManager;
   int m_expiryTimeVariancePercentage = 0;
 
   std::chrono::microseconds calculateHeaderTimeout(
@@ -386,19 +354,15 @@ class TcrConnection {
    * Send data to the connection till sendTimeout
    */
   ConnErrType sendData(const char* buffer, size_t length,
-                       std::chrono::microseconds sendTimeout,
-                       bool checkConnected = true);
+                       std::chrono::microseconds sendTimeout);
 
   /**
    * Read data from the connection till receiveTimeoutSec
    */
   ConnErrType receiveData(char* buffer, size_t length,
-                          std::chrono::microseconds receiveTimeoutSec,
-                          bool checkConnected = true,
-                          bool isNotificationMessage = false);
+                          std::chrono::microseconds receiveTimeoutSec);
 
   std::shared_ptr<TcrEndpoint> m_endpointObj;
-  volatile const bool& m_connected;
   std::unique_ptr<Connector> m_conn;
   ServerQueueStatus m_hasServerQueue;
   int32_t m_queueSize;
@@ -407,8 +371,8 @@ class TcrConnection {
   // semaphore to synchronize with the chunked response processing thread
   ACE_Semaphore m_chunksProcessSema;
 
-  time_point m_creationTime;
-  time_point m_lastAccessed;
+  std::chrono::steady_clock::time_point m_creationTime;
+  std::chrono::steady_clock::time_point m_lastAccessed;
 
   // Disallow copy constructor and assignment operator.
   TcrConnection(const TcrConnection&);
@@ -416,7 +380,6 @@ class TcrConnection {
   volatile bool m_isBeingUsed;
   std::atomic<uint32_t> m_isUsed;
   ThinClientPoolDM* m_poolDM;
-  bool useReplyTimeout(const TcrMessage& request) const;
   std::chrono::microseconds sendWithTimeouts(
       const char* data, size_t len, std::chrono::microseconds sendTimeout,
       std::chrono::microseconds receiveTimeout);
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index a731da0..4be0d08 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -143,8 +143,7 @@ GfErrType TcrEndpoint::createNewConnectionWL(
     if (locked) {
       try {
         LOGFINE("TcrEndpoint::createNewConnectionWL got lock");
-        newConn =
-            new TcrConnection(m_cacheImpl->tcrConnectionManager(), 
m_connected);
+        newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager());
         newConn->initTcrConnection(shared_from_this(), m_ports,
                                    isClientNotification, isSecondary,
                                    connectTimeout);
@@ -192,8 +191,7 @@ GfErrType TcrEndpoint::createNewConnection(
     try {
       if (newConn == nullptr) {
         if (!needtoTakeConnectLock() || !appThreadRequest) {
-          newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(),
-                                      m_connected);
+          newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager());
           bool authenticate = newConn->initTcrConnection(
               shared_from_this(), m_ports, isClientNotification, isSecondary,
               connectTimeout);
@@ -1170,7 +1168,10 @@ void TcrEndpoint::triggerRedundancyThread() {
 void TcrEndpoint::closeConnection(TcrConnection*& conn) {
   conn->close();
   m_ports.erase(conn->getPort());
-  _GEODE_SAFE_DELETE(conn);
+  try {
+    _GEODE_SAFE_DELETE(conn);
+  } catch (...) {
+  }
 }
 
 void TcrEndpoint::closeConnections() {
diff --git a/cppcache/src/ThinClientBaseDM.hpp 
b/cppcache/src/ThinClientBaseDM.hpp
index 2920876..61c3fe3 100644
--- a/cppcache/src/ThinClientBaseDM.hpp
+++ b/cppcache/src/ThinClientBaseDM.hpp
@@ -37,6 +37,7 @@ namespace client {
 class TcrMessage;
 class ThinClientRegion;
 class TcrEndpoint;
+class TcrConnection;
 class TcrConnectionManager;
 class TcrMessageReply;
 class TcrChunkedContext;
diff --git a/cppcache/src/ThinClientLocatorHelper.cpp 
b/cppcache/src/ThinClientLocatorHelper.cpp
index cb26224..611ff98 100644
--- a/cppcache/src/ThinClientLocatorHelper.cpp
+++ b/cppcache/src/ThinClientLocatorHelper.cpp
@@ -95,11 +95,12 @@ GfErrType ThinClientLocatorHelper::getAllServers(
 
       if (!conn->send(
               reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())),
-              data.getBufferLength())) {
+              data.getBufferLength(), m_poolDM->getReadTimeout())) {
         continue;
       }
       char buff[BUFF_SIZE];
-      const auto receivedLength = conn->receive(buff);
+      const auto receivedLength =
+          conn->receive(buff, m_poolDM->getReadTimeout());
 
       if (!receivedLength) {
         continue;
@@ -126,6 +127,8 @@ GfErrType ThinClientLocatorHelper::getAllServers(
       LOGFINE("Exception while querying locator: %s: %s",
               excp.getName().c_str(), excp.what());
       continue;
+    } catch (...) {
+      continue;
     }
   }
   return GF_NOERR;
@@ -175,11 +178,12 @@ GfErrType 
ThinClientLocatorHelper::getEndpointForNewCallBackConn(
 
       if (!conn->send(
               reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())),
-              data.getBufferLength())) {
+              data.getBufferLength(), m_poolDM->getReadTimeout())) {
         continue;
       }
       char buff[BUFF_SIZE];
-      const auto receivedLength = conn->receive(buff);
+      const auto receivedLength =
+          conn->receive(buff, m_poolDM->getReadTimeout());
 
       if (!receivedLength) {
         continue;
@@ -205,6 +209,8 @@ GfErrType 
ThinClientLocatorHelper::getEndpointForNewCallBackConn(
       LOGFINE("Exception while querying locator: %s: %s",
               excp.getName().c_str(), excp.what());
       continue;
+    } catch (...) {
+      continue;
     }
   }
   throw NoAvailableLocatorsException("Unable to query any locators");
@@ -263,11 +269,12 @@ GfErrType 
ThinClientLocatorHelper::getEndpointForNewFwdConn(
       }
       if (!conn->send(
               reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())),
-              data.getBufferLength())) {
+              data.getBufferLength(), m_poolDM->getReadTimeout())) {
         continue;
       }
       char buff[BUFF_SIZE];
-      const auto receivedLength = conn->receive(buff);
+      const auto receivedLength =
+          conn->receive(buff, m_poolDM->getReadTimeout());
 
       if (!receivedLength) {
         continue;  // return GF_EUNDEF;
@@ -339,12 +346,13 @@ GfErrType ThinClientLocatorHelper::updateLocators(
       data.writeObject(request);
       if (!conn->send(
               reinterpret_cast<char*>(const_cast<uint8_t*>(data.getBuffer())),
-              data.getBufferLength())) {
+              data.getBufferLength(), m_poolDM->getReadTimeout())) {
         conn = nullptr;
         continue;
       }
       char buff[BUFF_SIZE];
-      const auto receivedLength = conn->receive(buff);
+      const auto receivedLength =
+          conn->receive(buff, m_poolDM->getReadTimeout());
 
       if (!receivedLength) {
         continue;
diff --git a/cppcache/src/ThinClientLocatorHelper.hpp 
b/cppcache/src/ThinClientLocatorHelper.hpp
index 1471592..0034b86 100644
--- a/cppcache/src/ThinClientLocatorHelper.hpp
+++ b/cppcache/src/ThinClientLocatorHelper.hpp
@@ -39,6 +39,7 @@ namespace client {
 
 class ThinClientPoolDM;
 class Connector;
+class TcrConnection;
 
 class ThinClientLocatorHelper {
  public:
diff --git a/cppcache/src/ThinClientPoolDM.cpp 
b/cppcache/src/ThinClientPoolDM.cpp
index 5602828..c4909ec 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -404,6 +404,7 @@ void ThinClientPoolDM::manageConnections(std::atomic<bool>& 
isRunning) {
       }
     }
   }
+
   LOGFINE("ThinClientPoolDM: ending manageConnections thread");
 }
 
@@ -432,8 +433,9 @@ void 
ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
       removelist.push_back(conn);
     } else if (conn) {
       auto nextIdle =
-          _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
-                      TcrConnection::clock::now() - conn->getLastAccessed());
+          _idle -
+          std::chrono::duration_cast<std::chrono::milliseconds>(
+              std::chrono::steady_clock::now() - conn->getLastAccessed());
       if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
         _nextIdle = nextIdle;
       }
@@ -452,7 +454,10 @@ void 
ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
        iter != removelist.end(); ++iter) {
     TcrConnection* conn = *iter;
     if (replaceCount <= 0) {
-      GF_SAFE_DELETE_CON(conn);
+      try {
+        GF_SAFE_DELETE_CON(conn);
+      } catch (...) {
+      }
       removeEPConnections(1, false);
       getStats().incLoadCondDisconnects();
       LOGDEBUG("Removed a connection");
@@ -463,21 +468,28 @@ void 
ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
                            /*hasExpired(conn) ? nullptr :*/ conn);
       if (newConn) {
         auto nextIdle =
-            _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
-                        TcrConnection::clock::now() - conn->getLastAccessed());
+            _idle -
+            std::chrono::duration_cast<std::chrono::milliseconds>(
+                std::chrono::steady_clock::now() - conn->getLastAccessed());
         if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
           _nextIdle = nextIdle;
         }
         put(newConn, false);
         if (newConn != conn) {
-          GF_SAFE_DELETE_CON(conn);
+          try {
+            GF_SAFE_DELETE_CON(conn);
+          } catch (...) {
+          }
           removeEPConnections(1, false);
           getStats().incLoadCondDisconnects();
           LOGDEBUG("Removed a connection");
         }
       } else {
         if (hasExpired(conn)) {
-          GF_SAFE_DELETE_CON(conn);
+          try {
+            GF_SAFE_DELETE_CON(conn);
+          } catch (...) {
+          }
           removeEPConnections(1, false);
           getStats().incLoadCondDisconnects();
           LOGDEBUG("Removed a connection");
@@ -486,7 +498,7 @@ void 
ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
           auto nextIdle =
               _idle -
               std::chrono::duration_cast<std::chrono::milliseconds>(
-                  TcrConnection::clock::now() - conn->getLastAccessed());
+                  std::chrono::steady_clock::now() - conn->getLastAccessed());
           if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) 
{
             _nextIdle = nextIdle;
           }
@@ -1444,7 +1456,10 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
                         request.getMessageType() ==
                             TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP);
           if (conn) {
-            GF_SAFE_DELETE_CON(conn);
+            try {
+              GF_SAFE_DELETE_CON(conn);
+            } catch (...) {
+            }
           }
           excludeServers.insert(ServerLocation(ep->name()));
           if (error == GF_IOERR) {
@@ -2037,17 +2052,20 @@ void ThinClientPoolDM::pingServerLocal() {
 
 void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
   LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str());
+
   while (isRunning) {
     m_updateLocatorListSema.acquire();
     if (isRunning && !m_connManager.isNetDown()) {
       (m_locHelper)->updateLocators(getServerGroup());
     }
   }
+
   LOGFINE("Ending updateLocatorList thread for pool %s", m_poolName.c_str());
 }
 
 void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
   LOGFINE("Starting ping thread for pool %s", m_poolName.c_str());
+
   while (isRunning) {
     m_pingSema.acquire();
     if (isRunning && !m_connManager.isNetDown()) {
@@ -2055,6 +2073,7 @@ void ThinClientPoolDM::pingServer(std::atomic<bool>& 
isRunning) {
       m_pingSema.acquire();
     }
   }
+
   LOGFINE("Ending ping thread for pool %s", m_poolName.c_str());
 }
 

Reply via email to