This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0c29a884812247663168511d5c71b15257734f06 Author: Yunze Xu <[email protected]> AuthorDate: Wed Aug 25 14:19:49 2021 +0800 [C++] Make some clean up methods thread safe (#11762) * Make some close methods thread safe * Restore shutdown() in ClientImpl's destructor and check whether connection pool is closed (cherry picked from commit 098ba16c15e81dad84e5b03a6565f8bb9941ad7a) --- pulsar-client-cpp/lib/ClientImpl.cc | 13 ++++++++++++- pulsar-client-cpp/lib/ConnectionPool.cc | 14 +++++++++++++- pulsar-client-cpp/lib/ConnectionPool.h | 9 ++++++++- pulsar-client-cpp/lib/ExecutorService.cc | 7 +++++++ pulsar-client-cpp/lib/ExecutorService.h | 3 +++ pulsar-client-cpp/lib/Producer.cc | 1 - pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc | 10 ++++++++++ pulsar-client-cpp/tests/BinaryLookupServiceTest.cc | 2 -- pulsar-client-cpp/tests/CustomLoggerTest.cc | 4 ++-- pulsar-client-cpp/tests/MessageTest.cc | 2 -- pulsar-client-cpp/tests/ReaderConfigurationTest.cc | 3 --- 11 files changed, 55 insertions(+), 13 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 5abe6ec..613a979 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -560,10 +560,21 @@ void ClientImpl::shutdown() { } } - pool_.close(); + if (producers.size() + consumers.size() > 0) { + LOG_DEBUG(producers.size() << " producers and " << consumers.size() + << " consumers have been shutdown."); + } + if (!pool_.close()) { + // pool_ has already been closed. It means shutdown() has been called before. + return; + } + LOG_DEBUG("ConnectionPool is closed"); ioExecutorProvider_->close(); + LOG_DEBUG("ioExecutorProvider_ is closed"); listenerExecutorProvider_->close(); + LOG_DEBUG("listenerExecutorProvider_ is closed"); partitionListenerExecutorProvider_->close(); + LOG_DEBUG("partitionListenerExecutorProvider_ is closed"); } uint64_t ClientImpl::newProducerId() { diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc index bb4f5c2..e03697f 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.cc +++ b/pulsar-client-cpp/lib/ConnectionPool.cc @@ -41,7 +41,12 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP poolConnections_(poolConnections), mutex_() {} -void ConnectionPool::close() { +bool ConnectionPool::close() { + bool expectedState = false; + if (!closed_.compare_exchange_strong(expectedState, true)) { + return false; + } + std::unique_lock<std::mutex> lock(mutex_); if (poolConnections_) { for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { @@ -52,10 +57,17 @@ void ConnectionPool::close() { } pool_.clear(); } + return true; } Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync( const std::string& logicalAddress, const std::string& physicalAddress) { + if (closed_) { + Promise<Result, ClientConnectionWeakPtr> promise; + promise.setFailed(ResultAlreadyClosed); + return promise.getFuture(); + } + std::unique_lock<std::mutex> lock(mutex_); if (poolConnections_) { diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h index 8b35044..5032e80 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.h +++ b/pulsar-client-cpp/lib/ConnectionPool.h @@ -24,6 +24,7 @@ #include "ClientConnection.h" +#include <atomic> #include <string> #include <map> #include <mutex> @@ -36,7 +37,12 @@ class PULSAR_PUBLIC ConnectionPool { ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider, const AuthenticationPtr& authentication, bool poolConnections = true); - void close(); + /** + * Close the connection pool. + * + * @return false if it has already been closed. + */ + bool close(); /** * Get a connection from the pool. @@ -65,6 +71,7 @@ class PULSAR_PUBLIC ConnectionPool { PoolMap pool_; bool poolConnections_; std::mutex mutex_; + std::atomic_bool closed_{false}; friend class ConnectionPoolTest; }; diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc index f7cb010..4db3112 100644 --- a/pulsar-client-cpp/lib/ExecutorService.cc +++ b/pulsar-client-cpp/lib/ExecutorService.cc @@ -62,6 +62,11 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() { } void ExecutorService::close() { + bool expectedState = false; + if (!closed_.compare_exchange_strong(expectedState, true)) { + return; + } + io_service_->stop(); work_.reset(); // Detach the worker thread instead of join to avoid potential deadlock @@ -95,6 +100,8 @@ ExecutorServicePtr ExecutorServiceProvider::get() { } void ExecutorServiceProvider::close() { + Lock lock(mutex_); + for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) { if (*it != NULL) { (*it)->close(); diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h index d0ffc23..6746936 100644 --- a/pulsar-client-cpp/lib/ExecutorService.h +++ b/pulsar-client-cpp/lib/ExecutorService.h @@ -19,6 +19,7 @@ #ifndef _PULSAR_EXECUTOR_SERVICE_HEADER_ #define _PULSAR_EXECUTOR_SERVICE_HEADER_ +#include <atomic> #include <memory> #include <boost/asio.hpp> #include <boost/asio/ssl.hpp> @@ -73,6 +74,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable { * io_service */ std::thread worker_; + + std::atomic_bool closed_{false}; }; typedef std::shared_ptr<ExecutorService> ExecutorServicePtr; diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index acd021b..ad60828 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -24,7 +24,6 @@ #include "ProducerImpl.h" namespace pulsar { -DECLARE_LOG_OBJECT() static const std::string EMPTY_STRING; diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc index 5c37f48..919536f 100644 --- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc +++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc @@ -38,7 +38,17 @@ #include <boost/property_tree/ptree.hpp> namespace ptree = boost::property_tree; +#if defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunknown-warning-option" +#endif + #include <boost/xpressive/xpressive.hpp> + +#if defined(__clang__) +#pragma clang diagnostic pop +#endif + #include <boost/archive/iterators/base64_from_binary.hpp> #include <boost/archive/iterators/transform_width.hpp> diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc index 11cc053..b880df3 100644 --- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc +++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc @@ -27,8 +27,6 @@ #include <pulsar/Authentication.h> #include <boost/exception/all.hpp> -DECLARE_LOG_OBJECT() - using namespace pulsar; TEST(BinaryLookupServiceTest, basicLookup) { diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc index ec83e42..0b4e76a 100644 --- a/pulsar-client-cpp/tests/CustomLoggerTest.cc +++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc @@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) { // reset to previous log factory Client client("pulsar://localhost:6650", clientConfig); client.close(); - ASSERT_EQ(logLines.size(), 3); + ASSERT_EQ(logLines.size(), 7); LogUtils::resetLoggerFactory(); }); testThread.join(); @@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) { Client client("pulsar://localhost:6650", clientConfig); client.close(); // custom logger didn't get any new lines - ASSERT_EQ(logLines.size(), 3); + ASSERT_EQ(logLines.size(), 7); } TEST(CustomLoggerTest, testConsoleLoggerFactory) { diff --git a/pulsar-client-cpp/tests/MessageTest.cc b/pulsar-client-cpp/tests/MessageTest.cc index 246203c..3c728c9 100644 --- a/pulsar-client-cpp/tests/MessageTest.cc +++ b/pulsar-client-cpp/tests/MessageTest.cc @@ -22,8 +22,6 @@ #include <string> #include <lib/LogUtils.h> -DECLARE_LOG_OBJECT() - using namespace pulsar; TEST(MessageTest, testMessageContents) { MessageBuilder msgBuilder1; diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc index ccbfa2d..8dc60f4 100644 --- a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc +++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc @@ -22,12 +22,9 @@ */ #include <gtest/gtest.h> #include <pulsar/Client.h> -#include <lib/LogUtils.h> #include <lib/ReaderImpl.h> #include "NoOpsCryptoKeyReader.h" -DECLARE_LOG_OBJECT() - using namespace pulsar; static const std::string lookupUrl = "pulsar://localhost:6650";
