This is an automated email from the ASF dual-hosted git repository. bbender pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push: new 7453395 GEODE-7019: Fix closing of idle connections in C++ Native client (#504) 7453395 is described below commit 7453395b7f36a58cfa5a0c70610ea91d32435001 Author: Alberto Gomez <alberto.go...@est.tech> AuthorDate: Tue Aug 13 17:01:29 2019 +0200 GEODE-7019: Fix closing of idle connections in C++ Native client (#504) - Also clean up a couple of char* variables in internal APIs --- CONTRIBUTING.md | 12 +- .../testThinClientPoolAttrTest.cpp | 16 +-- cppcache/integration/test/CMakeLists.txt | 1 + cppcache/integration/test/CleanIdleConnections.cpp | 136 +++++++++++++++++++++ cppcache/src/CacheImpl.cpp | 2 +- cppcache/src/CacheImpl.hpp | 2 +- cppcache/src/PoolFactory.cpp | 17 +-- cppcache/src/ThinClientPoolDM.cpp | 133 ++++++++++---------- 8 files changed, 232 insertions(+), 87 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1757e54..9510ca9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -20,7 +20,7 @@ $ cd cppcache/test/<Debug|Release|if needed> $ ./apache-geode_unittests ``` -### Running integration tests +### Running old integration tests ```bash $ cd <clone> $ cd build @@ -41,6 +41,16 @@ $ ctest -R <test_name> -C <Debug|Release> ``` .NET integration tests can be executed similarly from `build/clicache/integration-test`. +### Running new integration tests +```bash +$ cd <clone> +$ cd build +$ cd cppcache/integration/test +$ ./cpp-integration-test [<options>] +``` +Note that <options> are gtest options that may be passed to the test executable, like for example the test cases to be run. Use --help to get all the available options. + + ## Style ### Formatting C++ diff --git a/cppcache/integration-test/testThinClientPoolAttrTest.cpp b/cppcache/integration-test/testThinClientPoolAttrTest.cpp index 40ab5b6..d3cb2a4 100644 --- a/cppcache/integration-test/testThinClientPoolAttrTest.cpp +++ b/cppcache/integration-test/testThinClientPoolAttrTest.cpp @@ -260,8 +260,8 @@ DUNIT_TASK(CLIENT1, ClientOp) // Check current # connections they should be == min std::string poolName = getHelper()->getRegion(poolRegNames[0])->getAttributes().getPoolName(); - int level = TestUtils::getCacheImpl(getHelper()->cachePtr) - ->getPoolSize(poolName.c_str()); + int level = + TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName); int min = getHelper() ->getCache() ->getPoolManager() @@ -281,8 +281,8 @@ DUNIT_TASK(CLIENT1, ClientOp) SLEEP(5000); // wait for threads to become active // Check current # connections they should be == max - level = TestUtils::getCacheImpl(getHelper()->cachePtr) - ->getPoolSize(poolName.c_str()); + level = + TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName); int max = getHelper() ->getCache() ->getPoolManager() @@ -301,8 +301,8 @@ DUNIT_TASK(CLIENT1, ClientOp) LOG("Waiting 25 sec for idle timeout to kick in"); SLEEP(25000); - level = TestUtils::getCacheImpl(getHelper()->cachePtr) - ->getPoolSize(poolName.c_str()); + level = + TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName); min = getHelper() ->getCache() ->getPoolManager() @@ -317,8 +317,8 @@ DUNIT_TASK(CLIENT1, ClientOp) LOG("Waiting 1 minute for load conditioning to kick in"); SLEEP(60000); - level = TestUtils::getCacheImpl(getHelper()->cachePtr) - ->getPoolSize(poolName.c_str()); + level = + TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName); sprintf(logmsg, "Pool level not equal to min level after load " "conditioning. Expected %d, actual %d", diff --git a/cppcache/integration/test/CMakeLists.txt b/cppcache/integration/test/CMakeLists.txt index 607fb08..a1d1134 100644 --- a/cppcache/integration/test/CMakeLists.txt +++ b/cppcache/integration/test/CMakeLists.txt @@ -35,6 +35,7 @@ add_executable(cpp-integration-test SimpleCqListener.hpp StructTest.cpp TransactionCleaningTest.cpp + CleanIdleConnections.cpp ) target_compile_definitions(cpp-integration-test diff --git a/cppcache/integration/test/CleanIdleConnections.cpp b/cppcache/integration/test/CleanIdleConnections.cpp new file mode 100644 index 0000000..de2ecbe --- /dev/null +++ b/cppcache/integration/test/CleanIdleConnections.cpp @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <thread> + +#include <gtest/gtest.h> + +#include <geode/Cache.hpp> +#include <geode/PoolManager.hpp> +#include <geode/RegionFactory.hpp> +#include <geode/RegionShortcut.hpp> + +#include "CacheImpl.hpp" +#include "CacheRegionHelper.hpp" +#include "framework/Cluster.h" +#include "framework/Framework.h" +#include "framework/Gfsh.h" + +namespace { + +using apache::geode::client::Cache; +using apache::geode::client::CacheImpl; +using apache::geode::client::CacheRegionHelper; +using apache::geode::client::Pool; +using apache::geode::client::Region; +using apache::geode::client::RegionShortcut; + +CacheImpl* getCacheImpl(Cache* cptr) { + return CacheRegionHelper::getCacheImpl(cptr); +} + +Cache createCache() { + using apache::geode::client::CacheFactory; + + auto cache = CacheFactory() + .set("log-level", "none") + .set("statistic-sampling-enabled", "false") + .create(); + + return cache; +} + +std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache, + const int& minConns, const int& maxConns, + const std::string& poolName) { + auto poolFactory = cache.getPoolManager().createFactory(); + cluster.applyLocators(poolFactory); + poolFactory.setPRSingleHopEnabled(true); + poolFactory.setMinConnections(minConns); + poolFactory.setMaxConnections(maxConns); + return poolFactory.create(poolName); +} + +std::shared_ptr<Region> setupRegion(Cache& cache, + const std::shared_ptr<Pool>& pool) { + auto region = cache.createRegionFactory(RegionShortcut::PROXY) + .setPoolName(pool->getName()) + .create("region"); + + return region; +} + +void doGets(std::shared_ptr<Region> region, int entries) { + for (auto i = 0; i < entries; i++) { + region->get(i); + } +} + +TEST(CleanIdleConnectionsTest, cleanIdleConnectionsAfterOpsPaused) { + Cluster cluster{LocatorCount{1}, ServerCount{2}}; + cluster.getGfsh() + .create() + .region() + .withName("region") + .withType("PARTITION") + .execute(); + + auto cache = createCache(); + auto minConns = 1; + auto maxConns = -1; + std::string poolName = "default"; + auto pool = createPool(cluster, cache, minConns, maxConns, poolName); + auto region = setupRegion(cache, pool); + + int poolSize = getCacheImpl(&cache)->getPoolSize(poolName); + ASSERT_EQ(poolSize, 0); + + sleep(10); + + poolSize = getCacheImpl(&cache)->getPoolSize(poolName); + ASSERT_GE(poolSize, minConns); + + int entries = 10; + for (auto i = 0; i < entries; i++) { + region->put(i, "value"); + } + + std::vector<std::shared_ptr<std::thread>> tasks; + int threads = 10; + + for (int i = 0; i < threads; i++) { + std::shared_ptr<std::thread> threadAux = + std::make_shared<std::thread>(doGets, region, entries); + tasks.push_back(threadAux); + } + + for (int i = 0; i < threads; i++) { + tasks[i]->join(); + } + + poolSize = getCacheImpl(&cache)->getPoolSize(poolName); + ASSERT_GT(poolSize, minConns); + + // As the default clientIdleTimeout is 5 secs, after 10 seconds + // all idle connections must have been closed. + sleep(10); + + poolSize = getCacheImpl(&cache)->getPoolSize(poolName); + ASSERT_EQ(poolSize, minConns); +} + +} // namespace diff --git a/cppcache/src/CacheImpl.cpp b/cppcache/src/CacheImpl.cpp index 7853fdc..a2cce13 100644 --- a/cppcache/src/CacheImpl.cpp +++ b/cppcache/src/CacheImpl.cpp @@ -699,7 +699,7 @@ void CacheImpl::processMarker() { } } -int CacheImpl::getPoolSize(const char* poolName) { +int CacheImpl::getPoolSize(const std::string& poolName) { if (const auto pool = getPoolManager().find(poolName)) { if (const auto dm = std::dynamic_pointer_cast<ThinClientPoolDM>(pool)) { return dm->m_poolSize; diff --git a/cppcache/src/CacheImpl.hpp b/cppcache/src/CacheImpl.hpp index f9ca86f..25b01ae 100644 --- a/cppcache/src/CacheImpl.hpp +++ b/cppcache/src/CacheImpl.hpp @@ -253,7 +253,7 @@ class APACHE_GEODE_EXPORT CacheImpl : private NonCopyable, void processMarker(); // Pool helpers for unit tests - int getPoolSize(const char* poolName); + int getPoolSize(const std::string& poolName); bool getPdxIgnoreUnreadFields() { this->throwIfClosed(); diff --git a/cppcache/src/PoolFactory.cpp b/cppcache/src/PoolFactory.cpp index c9f82d7..5b25227 100644 --- a/cppcache/src/PoolFactory.cpp +++ b/cppcache/src/PoolFactory.cpp @@ -74,7 +74,7 @@ PoolFactory::PoolFactory(const Cache& cache) PoolFactory& PoolFactory::setFreeConnectionTimeout( std::chrono::milliseconds connectionTimeout) { if (connectionTimeout <= std::chrono::milliseconds::zero()) { - throw IllegalArgumentException("connectionTimeout must greater than 0."); + throw IllegalArgumentException("connectionTimeout must be greater than 0."); } m_attrs->setFreeConnectionTimeout(connectionTimeout); @@ -85,7 +85,7 @@ PoolFactory& PoolFactory::setLoadConditioningInterval( std::chrono::milliseconds loadConditioningInterval) { if (loadConditioningInterval < std::chrono::milliseconds::zero()) { throw IllegalArgumentException( - "loadConditioningInterval must greater than or equlal to 0."); + "loadConditioningInterval must be greater than or equal to 0."); } m_attrs->setLoadConditioningInterval(loadConditioningInterval); @@ -105,7 +105,7 @@ PoolFactory& PoolFactory::setThreadLocalConnections( PoolFactory& PoolFactory::setReadTimeout(std::chrono::milliseconds timeout) { if (timeout <= std::chrono::milliseconds::zero()) { - throw IllegalArgumentException("timeout must greater than 0."); + throw IllegalArgumentException("timeout must be greater than 0."); } m_attrs->setReadTimeout(timeout); @@ -126,7 +126,7 @@ PoolFactory& PoolFactory::setIdleTimeout( std::chrono::milliseconds idleTimeout) { if (idleTimeout < std::chrono::milliseconds::zero()) { throw IllegalArgumentException( - "idleTimeout must greater than or equlal to 0."); + "idleTimeout must be greater than or equal to 0."); } m_attrs->setIdleTimeout(idleTimeout); @@ -141,7 +141,7 @@ PoolFactory& PoolFactory::setRetryAttempts(int retryAttempts) { PoolFactory& PoolFactory::setPingInterval( std::chrono::milliseconds pingInterval) { if (pingInterval <= std::chrono::milliseconds::zero()) { - throw IllegalArgumentException("timeout must greater than 0."); + throw IllegalArgumentException("timeout must be greater than 0."); } m_attrs->setPingInterval(pingInterval); @@ -161,7 +161,8 @@ PoolFactory& PoolFactory::setUpdateLocatorListInterval( PoolFactory& PoolFactory::setStatisticInterval( std::chrono::milliseconds statisticInterval) { if (statisticInterval < std::chrono::milliseconds::zero()) { - throw IllegalArgumentException("timeout must greater than or equal to 0."); + throw IllegalArgumentException( + "timeout must be greater than or equal to 0."); } m_attrs->setStatisticInterval(statisticInterval); @@ -201,7 +202,7 @@ PoolFactory& PoolFactory::setSubscriptionRedundancy(int redundancy) { PoolFactory& PoolFactory::setSubscriptionMessageTrackingTimeout( std::chrono::milliseconds messageTrackingTimeout) { if (messageTrackingTimeout <= std::chrono::milliseconds::zero()) { - throw IllegalArgumentException("timeout must greater than 0."); + throw IllegalArgumentException("timeout must be greater than 0."); } m_attrs->setSubscriptionMessageTrackingTimeout(messageTrackingTimeout); @@ -211,7 +212,7 @@ PoolFactory& PoolFactory::setSubscriptionMessageTrackingTimeout( PoolFactory& PoolFactory::setSubscriptionAckInterval( std::chrono::milliseconds ackInterval) { if (ackInterval <= std::chrono::milliseconds::zero()) { - throw IllegalArgumentException("timeout must greater than 0."); + throw IllegalArgumentException("timeout must be greater than 0."); } m_attrs->setSubscriptionAckInterval(ackInterval); diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp index 4eaaca9..7f798f6 100644 --- a/cppcache/src/ThinClientPoolDM.cpp +++ b/cppcache/src/ThinClientPoolDM.cpp @@ -408,52 +408,73 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) { auto _idle = getIdleTimeout(); auto _nextIdle = _idle; - { - TcrConnection* conn = nullptr; - std::vector<TcrConnection*> savelist; - std::vector<TcrConnection*> replacelist; - std::set<ServerLocation> excludeServers; + TcrConnection* conn = nullptr; - while ((conn = getNoWait()) != nullptr && isRunning) { - if (canItBeDeleted(conn)) { - replacelist.push_back(conn); - } else if (conn) { - auto nextIdle = - _idle - std::chrono::duration_cast<std::chrono::milliseconds>( - TcrConnection::clock::now() - conn->getLastAccessed()); - if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) { - _nextIdle = nextIdle; - } - savelist.push_back(conn); + std::vector<TcrConnection*> savelist; + std::vector<TcrConnection*> removelist; + std::set<ServerLocation> excludeServers; + + while ((conn = getNoWait()) != nullptr && isRunning) { + if (canItBeDeleted(conn)) { + removelist.push_back(conn); + } else if (conn) { + auto nextIdle = + _idle - std::chrono::duration_cast<std::chrono::milliseconds>( + TcrConnection::clock::now() - conn->getLastAccessed()); + if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) { + _nextIdle = nextIdle; } + savelist.push_back(conn); } + } - size_t replaceCount = - m_attrs->getMinConnections() - static_cast<int>(savelist.size()); + auto replaceCount = + m_attrs->getMinConnections() - static_cast<int>(savelist.size()); - LOGDEBUG("Preserving %d connections", savelist.size()); + LOGDEBUG("Preserving %d connections", savelist.size()); - for (auto savedconn : savelist) { - put(savedconn, false); - } - savelist.clear(); - int count = 0; + for (auto savedconn : savelist) { + put(savedconn, false); + } + savelist.clear(); + int count = 0; - for (std::vector<TcrConnection*>::const_iterator iter = replacelist.begin(); - iter != replacelist.end(); ++iter) { - TcrConnection* conn = *iter; - if (replaceCount <= 0) { - GF_SAFE_DELETE_CON(conn); - removeEPConnections(1, false); - getStats().incLoadCondDisconnects(); - LOGDEBUG("Removed a connection"); + for (std::vector<TcrConnection*>::const_iterator iter = removelist.begin(); + iter != removelist.end(); ++iter) { + TcrConnection* conn = *iter; + if (replaceCount <= 0) { + GF_SAFE_DELETE_CON(conn); + removeEPConnections(1, false); + getStats().incLoadCondDisconnects(); + LOGDEBUG("Removed a connection"); + } else { + TcrConnection* newConn = nullptr; + bool maxConnLimit = false; + createPoolConnection(newConn, excludeServers, maxConnLimit, + /*hasExpired(conn) ? nullptr :*/ conn); + if (newConn) { + auto nextIdle = + _idle - std::chrono::duration_cast<std::chrono::milliseconds>( + TcrConnection::clock::now() - conn->getLastAccessed()); + if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) { + _nextIdle = nextIdle; + } + put(newConn, false); + if (newConn != conn) { + GF_SAFE_DELETE_CON(conn); + removeEPConnections(1, false); + getStats().incLoadCondDisconnects(); + LOGDEBUG("Removed a connection"); + } } else { - TcrConnection* newConn = nullptr; - bool maxConnLimit = false; - createPoolConnection(newConn, excludeServers, maxConnLimit, - /*hasExpired(conn) ? nullptr :*/ conn); - if (newConn) { + if (hasExpired(conn)) { + GF_SAFE_DELETE_CON(conn); + removeEPConnections(1, false); + getStats().incLoadCondDisconnects(); + LOGDEBUG("Removed a connection"); + } else { + conn->updateCreationTime(); auto nextIdle = _idle - std::chrono::duration_cast<std::chrono::milliseconds>( @@ -461,40 +482,16 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) { if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) { _nextIdle = nextIdle; } - put(newConn, false); - if (newConn != conn) { - GF_SAFE_DELETE_CON(conn); - removeEPConnections(1, false); - getStats().incLoadCondDisconnects(); - LOGDEBUG("Removed a connection"); - } - } else { - if (hasExpired(conn)) { - GF_SAFE_DELETE_CON(conn); - removeEPConnections(1, false); - getStats().incLoadCondDisconnects(); - LOGDEBUG("Removed a connection"); - } else { - conn->updateCreationTime(); - auto nextIdle = - _idle - - std::chrono::duration_cast<std::chrono::milliseconds>( - TcrConnection::clock::now() - conn->getLastAccessed()); - if (nextIdle > std::chrono::seconds::zero() && - nextIdle < _nextIdle) { - _nextIdle = nextIdle; - } - put(conn, false); - } + put(conn, false); } } - replaceCount--; - count++; - if (count % 10 == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } } - replacelist.clear(); + replaceCount--; + count++; + + if (count % 10 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } if (m_connManageTaskId >= 0 && isRunning &&