This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch tubemq-client-cpp in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push: new 2e2b007 [TUBEMQ-275]Thread Pool & Timer (#205) 2e2b007 is described below commit 2e2b0073a3d7323494f6c2ce4f40b61135f2ce68 Author: charlely <41224243+charl...@users.noreply.github.com> AuthorDate: Tue Jul 14 15:33:09 2020 +0800 [TUBEMQ-275]Thread Pool & Timer (#205) Co-authored-by: charleli <charl...@tencent.com> --- .../tubemq-client-cpp/CMakeLists.txt | 33 ++++---- tubemq-client-twins/tubemq-client-cpp/README.md | 33 ++++++++ .../tubemq-client-cpp/example/CMakeLists.txt | 3 +- .../{src => example/executor_pool}/CMakeLists.txt | 13 +--- .../singleton.h => example/executor_pool/main.cc} | 63 ++++++++-------- .../include/tubemq/executor_pool.h | 88 ++++++++++++++++++++++ .../tubemq-client-cpp/include/tubemq/singleton.h | 5 +- .../tubemq-client-cpp/src/CMakeLists.txt | 3 - .../tubemq-client-cpp/src/executor_pool.cc | 87 +++++++++++++++++++++ .../tubemq-client-cpp/third_party/README.md | 23 ++++++ .../tubemq-client-cpp/third_party/readme.md | 2 - 11 files changed, 288 insertions(+), 65 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt index db9014a..984a28a 100644 --- a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt +++ b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt @@ -22,6 +22,26 @@ cmake_minimum_required (VERSION 3.1) project (TubeMQ) +set (CXX_FLAGS + -g + -fPIC + -Wall + -D__STDC_FORMAT_MACROS + -Wno-unused-parameter + -Wno-unused-function + -Wunused-variable + -Wunused-value + -Wshadow + -Wcast-qual + -Wcast-align + -Wwrite-strings + -Wsign-compare + -Winvalid-pch + -fms-extensions + -Wfloat-equal + -Wextra + -std=c++11 + ) INCLUDE_DIRECTORIES(include) @@ -35,16 +55,3 @@ ADD_SUBDIRECTORY(third_party) ADD_SUBDIRECTORY(example) -if (UNIX) - SET(CMAKE_CXX_FLAGS_DEBUG "-O1 -g -ggdb -D_DEBUG") - SET(CMAKE_CXX_FLAGS_RELEASE "-O3 -g -ggdb -DNDEBUG") - SET(DEPENDENT_LIBRARIES log4cplus pthread) -else (UNIX) - SET(DEPENDENT_LIBRARIES log4cplus) -endif (UNIX) - - - - - - diff --git a/tubemq-client-twins/tubemq-client-cpp/README.md b/tubemq-client-twins/tubemq-client-cpp/README.md new file mode 100644 index 0000000..ecf0f6f --- /dev/null +++ b/tubemq-client-twins/tubemq-client-cpp/README.md @@ -0,0 +1,33 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + + +# TubeMQ C++ client library +## Requirements + + * CMake + * [ASIO](https://github.com/chriskohlhoff/asio.git) + * [OpenSSL](https://github.com/openssl/openssl.git) + * [Protocol Buffer](https://developers.google.com/protocol-buffers/) + * [Log4cplus](https://github.com/log4cplus/log4cplus.git) + * [Rapidjson](https://github.com/Tencent/rapidjson.git) + + diff --git a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt index 1d8fcdd..9d729c2 100644 --- a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt +++ b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt @@ -26,7 +26,8 @@ function(tubemq_add_example _name) set(_srcs ${ARGN}) message (STATUS "${_name} sources: ${_srcs}") add_executable (${_name} ${_srcs}) - TARGET_LINK_LIBRARIES (${_name} tubemq log4cplus pthread) + TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto log4cplus pthread) endfunction() add_subdirectory (log) +add_subdirectory (executor_pool) diff --git a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt similarity index 67% copy from tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt copy to tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt index a6e76e5..bef5bbb 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt +++ b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt @@ -18,15 +18,4 @@ # -cmake_minimum_required (VERSION 3.1) - - -set(CMAKE_C_FLAGS "-O2 -g -Wall -Werror -Wsign-compare -fno-strict-aliasing -fPIC") -set(CMAKE_CXX_FLAGS "-std=c++11 -O2 -g -Wall -Werror -Wsign-compare -fno-strict-aliasing -fPIC") - -AUX_SOURCE_DIRECTORY(. CURRENT_DIR_SRCS) -ADD_LIBRARY(tubemq STATIC ${CURRENT_DIR_SRCS}) -TARGET_LINK_LIBRARIES (tubemq) - - - +tubemq_add_example(executor_pool main.cc) diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc similarity index 50% copy from tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h copy to tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc index fb22587..8c5ca25 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h +++ b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc @@ -17,42 +17,39 @@ * under the License. */ -#ifndef _TUBEMQ_SINGLETON_H -#define _TUBEMQ_SINGLETON_H - -#include <assert.h> -#include <stdlib.h> - -#include <mutex> +#include <chrono> +#include <exception> +#include <functional> +#include <iostream> +#include <string> #include <thread> -#include "tubemq/noncopyable.h" +#include "tubemq/executor_pool.h" -namespace tubemq { +using namespace std; +using namespace tubemq; -template <typename T> -class Singleton : noncopyable { - public: - Singleton() = delete; - ~Singleton() = delete; - - static T& instance() { - std::call_once(once_, Singleton::init); - assert(value_ != nullptr); - return *value_; +void handler(int a, const asio::error_code& error) { + if (!error) { + // Timer expired. + std::cout << "handlertimeout:" << a << endl; } +} + +int main() { + using namespace std::placeholders; // for _1, _2, _3... + ExecutorPool pool(4); + auto timer = pool.Get()->CreateSteadyTimer(); + timer->expires_after(std::chrono::seconds(5)); + std::cout << "startwait" << endl; + timer->wait(); + std::cout << "endwait" << endl; + + timer->expires_after(std::chrono::milliseconds(100)); + std::cout << "startsyncwait" << endl; + timer->async_wait(std::bind(handler, 5, _1)); + std::cout << "endsyncwait" << endl; + std::this_thread::sleep_for(5s); + return 0; +} - private: - static void init() { value_ = new T(); } - - private: - static std::once_flag once_; - static T* value_; -}; - -template <typename T> -T* Singleton<T>::value_ = nullptr; - -} // namespace tubemq - -#endif // _TUBEMQ_SINGLETON_H diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h new file mode 100644 index 0000000..0dd1f66 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _TUBEMQ_EXECUTOR_POOL_ +#define _TUBEMQ_EXECUTOR_POOL_ + +#include <stdlib.h> + +#include <asio.hpp> +#include <asio/ssl.hpp> +#include <functional> +#include <memory> +#include <mutex> +#include <thread> + +#include "tubemq/noncopyable.h" + +namespace tubemq { + +typedef std::shared_ptr<asio::ip::tcp::socket> SocketPtr; +typedef std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> > TlsSocketPtr; +typedef std::shared_ptr<asio::ip::tcp::resolver> TcpResolverPtr; +typedef std::shared_ptr<asio::steady_timer> SteadyTimerPtr; + +class Executor : noncopyable { + public: + Executor(); + ~Executor(); + + SocketPtr CreateSocket(); + TlsSocketPtr CreateTlsSocket(SocketPtr &socket, asio::ssl::context &ctx); + TcpResolverPtr CreateTcpResolver(); + SteadyTimerPtr CreateSteadyTimer(); + using func = std::function<void(void)>; + + void Post(func task); + + std::shared_ptr<asio::io_context> GetIoContext() { return io_context_; } + + // Close executor and exit thread + void Close(); + + private: + void StartWorker(std::shared_ptr<asio::io_context> io_context); + std::shared_ptr<asio::io_context> io_context_; + using io_context_work = asio::executor_work_guard<asio::io_context::executor_type>; + io_context_work work_; + std::thread worker_; +}; + +typedef std::shared_ptr<Executor> ExecutorPtr; + +class ExecutorPool : noncopyable { + public: + explicit ExecutorPool(int nthreads = 2); + + ExecutorPtr Get(); + + void Close(); + + private: + typedef std::vector<ExecutorPtr> ExecutorList; + ExecutorList executors_; + uint32_t executorIdx_; + std::mutex mutex_; + typedef std::unique_lock<std::mutex> Lock; +}; + +typedef std::shared_ptr<ExecutorPool> ExecutorPoolPtr; +} // namespace tubemq + +#endif //_TUBEMQ_EXECUTOR_POOL_ diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h index fb22587..2761bda 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h @@ -36,7 +36,7 @@ class Singleton : noncopyable { Singleton() = delete; ~Singleton() = delete; - static T& instance() { + static T& Instance() { std::call_once(once_, Singleton::init); assert(value_ != nullptr); return *value_; @@ -51,6 +51,9 @@ class Singleton : noncopyable { }; template <typename T> +std::once_flag Singleton<T>::once_; + +template <typename T> T* Singleton<T>::value_ = nullptr; } // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt index a6e76e5..5995d5c 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt +++ b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt @@ -21,9 +21,6 @@ cmake_minimum_required (VERSION 3.1) -set(CMAKE_C_FLAGS "-O2 -g -Wall -Werror -Wsign-compare -fno-strict-aliasing -fPIC") -set(CMAKE_CXX_FLAGS "-std=c++11 -O2 -g -Wall -Werror -Wsign-compare -fno-strict-aliasing -fPIC") - AUX_SOURCE_DIRECTORY(. CURRENT_DIR_SRCS) ADD_LIBRARY(tubemq STATIC ${CURRENT_DIR_SRCS}) TARGET_LINK_LIBRARIES (tubemq) diff --git a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc new file mode 100644 index 0000000..a2536ae --- /dev/null +++ b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "tubemq/executor_pool.h" + +#include <asio.hpp> +#include <functional> +#include <memory> + +namespace tubemq { + +Executor::Executor() + : io_context_(new asio::io_context()), + work_(asio::make_work_guard(*io_context_)), + worker_(std::bind(&Executor::StartWorker, this, io_context_)) {} + +Executor::~Executor() { + Close(); + if (worker_.joinable()) { + worker_.detach(); + } +} + +void Executor::StartWorker(std::shared_ptr<asio::io_context> io_context) { io_context_->run(); } + +SocketPtr Executor::CreateSocket() { return SocketPtr(new asio::ip::tcp::socket(*io_context_)); } + +TlsSocketPtr Executor::CreateTlsSocket(SocketPtr &socket, asio::ssl::context &ctx) { + return std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> >( + new asio::ssl::stream<asio::ip::tcp::socket &>(*socket, ctx)); +} + +TcpResolverPtr Executor::CreateTcpResolver() { + return TcpResolverPtr(new asio::ip::tcp::resolver(*io_context_)); +} + +SteadyTimerPtr Executor::CreateSteadyTimer() { + return SteadyTimerPtr(new asio::steady_timer(*io_context_)); +} + +void Executor::Close() { + io_context_->stop(); + if (std::this_thread::get_id() != worker_.get_id() && worker_.joinable()) { + worker_.join(); + } +} + +void Executor::Post(Executor::func task) { io_context_->post(task); } + +ExecutorPool::ExecutorPool(int nthreads) : executors_(nthreads), executorIdx_(0), mutex_() {} + +ExecutorPtr ExecutorPool::Get() { + Lock lock(mutex_); + + int idx = executorIdx_++ % executors_.size(); + if (!executors_[idx]) { + executors_[idx] = std::make_shared<Executor>(); + } + + return executors_[idx]; +} + +void ExecutorPool::Close() { + for (auto it = executors_.begin(); it != executors_.end(); ++it) { + if (*it != nullptr) { + (*it)->Close(); + } + it->reset(); + } +} +} // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/README.md b/tubemq-client-twins/tubemq-client-cpp/third_party/README.md new file mode 100644 index 0000000..6497505 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-cpp/third_party/README.md @@ -0,0 +1,23 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +#Third-party libraries# +tubemq-client-cpp depends on several third-party libraries, their source code is available (usually as a git submodule) in this directory. diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/readme.md b/tubemq-client-twins/tubemq-client-cpp/third_party/readme.md deleted file mode 100644 index e1752ca..0000000 --- a/tubemq-client-twins/tubemq-client-cpp/third_party/readme.md +++ /dev/null @@ -1,2 +0,0 @@ -#Third-party libraries# -tubemq-client-cpp depends on several third-party libraries, their source code is available (usually as a git submodule) in this directory. \ No newline at end of file